Home  >  Article  >  Backend Development  >  C++ multi-threading framework (3): message queue

C++ multi-threading framework (3): message queue

黄舟
黄舟Original
2017-02-06 13:53:192157browse

Before, we have encapsulated some basic things about multi-threading, including thread creation, mutex locks, and semaphores. Let’s take a look at the message queue


us Try to use as little as possible the message queue that comes with the system (such as Linux's sys/msgqueue), which is not very portable. We hope that the message queue will use standard C++ data structures for message packaging and extraction. Of course, you can also You can use a linked list or FIFO, in which case you have to write a linked list or FIFO first.


I am lazy and directly use C++'s STL deque, which is a dual-port queue. This way the reliability is guaranteed. Of course, the speed may not be as fast as the linked list written by myself. But it doesn't matter. Using a dual-port queue, you can also insert data into the head or tail of the queue according to your own needs, which is still useful when messages have priority.


The core function of the message queue is actually very simple. One or more threads pile data at the back of a queue, and another thread takes the data from the front of the queue for processing. The basic operations are also There are only two, one sending and one receiving, so we define the message queue base class as:

class CMsgQueue  
{  
    public:  
        CMsgQueue(const char *pName=NULL);  
        ~CMsgQueue();  
        //revice data from message queue  
        virtual bool recvMsg(unsigned int &m_msg_code,void *&p_msg)=0;  
        //send data to message queue  
        virtual bool sendMsg(unsigned int m_msg_code,void *p_msg)=0;  
        const char * getName(void) const {  
                return msg_queue_name;  
            }        
    private:  
        char *msg_queue_name;  
};


Then remember to add the method to create the message queue in COperratingSystemFactory:

class COperatingSystemFactory  
{  
    public:  
        static COperatingSystem *newOperatingSystem();  
        static CCountingSem  *newCountingSem(unsigned int init);  
        static CMutex           *newMutex(const char *pName=NULL);  
        static CMsgQueue     *newMsgQueue(const char *pName=NULL);  
  
};


Finally, inherit a CLinuxMsgQueue from CMsgQueue, and then implement recvMsg and sendMsg. Pay attention when implementing it.


Simply operating the dual-port FIFO will not work. We hope that if there is no message when receiving a message, the thread will block there and wait for the message until a message arrives before continuing to run. Therefore, When receiving messages, we use a semaphore and block in the semaphore. When sending messages, we finish operating the queue and send a semaphore out.


Secondly, for the operation of the queue, we hope to be atomic, otherwise it will be chaotic if one is receiving and the other is sending, so we use a mutex lock when operating the queue. Let’s lock it to ensure basic atomicity.


The corresponding program is


1. Apply for a lock and a semaphore for each message queue

CLinuxMsgQueue::CLinuxMsgQueue(const char *pName):  
CMsgQueue(pName)  
{  
    p_mutex=COperatingSystemFactory::newMutex("Msg Mutex");  
    p_sem=COperatingSystemFactory::newCountingSem(0);  
}


When receiving a message:

bool CLinuxMsgQueue::recvMsg(unsigned int &m_msg_code,void *&p_msg)  
{  
    bool result;  
        Elements queue_element;  
    p_sem->Get();  //通过信号量阻塞在这里,有消息到达了才接着往下走  
    p_mutex->Lock();  //锁定,保证原子性  
        //操作队列  
    if (m_queue.empty()) {  
                p_mutex-> UnLock ();  
            return false;    
    }  
    queue_element = m_queue.front();  
    m_queue.pop_front();  
    m_msg_code = queue_element.msg_code;  
    p_msg = queue_element.p_message;  
        //操作队列结束  
    p_mutex->UnLock(); //解除锁定  
        return true;  
}


It is also done in a similar way when sending. In this way, a final The simple message queue is complete. If we want to use the message queue, it is very simple. In main.cpp,

int main()  
{  
        //首先,新建一个消息队列  
        CMsgQueue *q=COperatingSystemFactory::newMsgQueue("B to A message Queue");  
        //新建两个线程,TestThread和TestThreadB都是从CThread继承下来的线程类  
    TestThread *a=new TestThread("A");  
    TestThreadB *b=new TestThreadB("B");  
        //将消息队列放到两个线程实体的局部变量中  
    a->setMsgQueue(q);  
    b->setMsgQueue(q);  
        //启动线程  
    a->run();  
    b->run();  
}


When we want to send a message in mainloop, we only need to call

p_msg_send->sendMsg(code, (void *)p_msg);  
//其中p_msg_send是b线程的局部变量,实际指向的是之前新建的消息队列q

github address:

https://github.com/wyh267/Cplusplus_Thread_Lib

Written at the back:

Of course, this code is still very incomplete, the entire code There are not many lines in total. Here, I just provide a code framework method as a demo for your reference. If it is really needed for actual use, there are many, many places that need to be modified. My code on github cannot be used in production software. In actual use, in actual projects, I also implemented a thread library without any third party, which is much more complicated than this. It also includes event processing, wait timeout, message broadcast, message subscription and other modules, and it can run on Linux. , ecos and many other platforms, it is basically platform-independent, but due to various reasons I cannot publish all the codes. The framework mentioned here is only a very small part of the thread library extracted from the project. Similarly , it only provides a programming design idea. I hope everyone can explore and improve the following things in their own way. Maybe after reading it, you will come up with a more powerful and concise framework.


In addition, I will continue to improve the code on github and add other modules one after another. If you are interested, you can improve it with me. I try not to use the previous ones. Implemented thread library code to avoid unnecessary trouble.

The above is the content of C++ multi-threading framework (3): message queue. For more related content, please pay attention to the PHP Chinese website (www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn