Maison > Article > développement back-end > Framework multithread C (3) : file d'attente de messages
Avant, nous avons résumé quelques éléments de base sur le multithreading, notamment la création de threads, les verrous mutex et les sémaphores
Nous. Essayez d'utiliser le moins possible la file d'attente de messages fournie avec le système (comme la file d'attente sys/msgqueue de Linux), qui n'est pas très portable. Nous espérons que la file d'attente de messages utilisera des structures de données C standard pour l'empaquetage et l'extraction des messages. , vous pouvez également Vous pouvez utiliser une liste chaînée ou FIFO, auquel cas vous devez d'abord écrire une liste chaînée ou FIFO.
Je suis paresseux et j'utilise directement le deque STL de C, qui est une file d'attente à double port. De cette façon, la fiabilité est garantie. Bien sûr, la vitesse peut ne pas être aussi rapide. comme la liste chaînée écrite par moi-même. Mais cela n'a pas d'importance. En utilisant une file d'attente à double port, vous pouvez également insérer des données en tête ou en queue de file d'attente selon vos propres besoins, ce qui est toujours utile lorsque les messages sont prioritaires.
La fonction principale de la file d'attente de messages est en fait très simple. Un ou plusieurs threads empilent les données à la fin d'une file d'attente, et un autre thread prend les données au début de la file d'attente. pour le traitement. Les opérations de base sont également Il n'y en a que deux, une pour l'envoi et une pour la réception, nous définissons donc la classe de base de la file d'attente des messages comme :
class CMsgQueue { public: CMsgQueue(const char *pName=NULL); ~CMsgQueue(); //revice data from message queue virtual bool recvMsg(unsigned int &m_msg_code,void *&p_msg)=0; //send data to message queue virtual bool sendMsg(unsigned int m_msg_code,void *p_msg)=0; const char * getName(void) const { return msg_queue_name; } private: char *msg_queue_name; };
Alors n'oubliez pas de ajoutez la méthode pour créer la file d'attente de messages dans COperratingSystemFactory :
class COperatingSystemFactory { public: static COperatingSystem *newOperatingSystem(); static CCountingSem *newCountingSem(unsigned int init); static CMutex *newMutex(const char *pName=NULL); static CMsgQueue *newMsgQueue(const char *pName=NULL); };
Enfin, héritez d'un CLinuxMsgQueue de CMsgQueue, puis implémentez recvMsg et sendMsg. Faites attention lors de son implémentation.
Le simple fonctionnement du FIFO à double port ne fonctionnera pas. Nous espérons que s'il n'y a pas de message lors de la réception d'un message, le fil se bloquera là et attendra le message jusqu'à ce que le message soit envoyé. un message arrive avant de continuer à s'exécuter.Par conséquent, lors de la réception des messages, nous utilisons un sémaphore et bloquons le sémaphore. Lors de l'envoi des messages, nous finissons d'exploiter la file d'attente et envoyons un sémaphore.
Deuxièmement, pour le fonctionnement de la file d'attente, on espère être atomique, sinon ce sera chaotique si l'un reçoit et l'autre envoie, donc on utilise un mutex verrouiller lors de l'utilisation de la file d'attente Verrouillons-la pour garantir l'atomicité de base. Le programme spécifique correspondant à
est
1. Demander un verrou et un sémaphore pour chaque file d'attente de messages <.>
CLinuxMsgQueue::CLinuxMsgQueue(const char *pName): CMsgQueue(pName) { p_mutex=COperatingSystemFactory::newMutex("Msg Mutex"); p_sem=COperatingSystemFactory::newCountingSem(0); }
bool CLinuxMsgQueue::recvMsg(unsigned int &m_msg_code,void *&p_msg) { bool result; Elements queue_element; p_sem->Get(); //通过信号量阻塞在这里,有消息到达了才接着往下走 p_mutex->Lock(); //锁定,保证原子性 //操作队列 if (m_queue.empty()) { p_mutex-> UnLock (); return false; } queue_element = m_queue.front(); m_queue.pop_front(); m_msg_code = queue_element.msg_code; p_msg = queue_element.p_message; //操作队列结束 p_mutex->UnLock(); //解除锁定 return true; }
int main() { //首先,新建一个消息队列 CMsgQueue *q=COperatingSystemFactory::newMsgQueue("B to A message Queue"); //新建两个线程,TestThread和TestThreadB都是从CThread继承下来的线程类 TestThread *a=new TestThread("A"); TestThreadB *b=new TestThreadB("B"); //将消息队列放到两个线程实体的局部变量中 a->setMsgQueue(q); b->setMsgQueue(q); //启动线程 a->run(); b->run(); }
p_msg_send->sendMsg(code, (void *)p_msg); //其中p_msg_send是b线程的局部变量,实际指向的是之前新建的消息队列qAdresse github : https://github.com/wyh267/Cplusplus_Thread_LibÉcrit au dos : Bien sûr , ce code est très insatisfaisant. Complet, le code entier n'a pas beaucoup de lignes. Ici, je fournis simplement une méthode de cadre de code comme démonstration pour votre référence. Si cela est vraiment nécessaire pour une utilisation réelle, il y a de très nombreux endroits qui en ont besoin. à modifier. Mon code sur github est également Il ne peut pas être réellement utilisé dans un logiciel de production, j'ai également implémenté une bibliothèque de threads sans aucun tiers. Cela inclut également le traitement des événements, attendez. délai d'attente, diffusion de messages, abonnement aux messages et autres modules, et il peut fonctionner sur plusieurs plates-formes telles que Linux, Ecos, etc., et est fondamentalement indépendant de la plate-forme. Cependant, pour diverses raisons, je ne peux pas publier tous les codes mentionnés. voici seulement une très petite partie de la bibliothèque de threads extraite du projet. Une partie également ne fournit qu'une idée de conception de programmation. J'espère que tout le monde pourra explorer et améliorer les choses suivantes. Peut-être qu'après l'avoir lu, vous proposerez. un cadre plus puissant et plus concis.