首頁  >  文章  >  後端開發  >  C++ 多執行緒框架(3):訊息佇列

C++ 多執行緒框架(3):訊息佇列

黄舟
黄舟原創
2017-02-06 13:53:192188瀏覽

之前,多執行緒一些基本的東西,包括執行緒創建,互斥鎖,訊號量,我們都已經封裝,下面來看看訊息佇列


我們盡量少用系統自帶的訊息佇列(例如Linux的sys/msgqueue),那樣移植性不是很強,我們希望的訊息隊列,在訊息打包和提取都是用的標準的C++資料結構,當然,你也可以用鍊錶或者是FIFO,那樣得先寫個鍊錶或者FIFO出來。


我比較懶,直接用的C++的STL的deque,即雙端口隊列,這樣可靠性有保證,當然,速度可能沒有自己寫的鍊錶快,但是沒關係,使用雙端口隊列還可以根據你自己的需求將資料插入佇列頭或佇列尾,這樣在訊息有優先權的情況下還是有用的。


訊息佇列的核心作用其實很簡單,一個或多個執行緒往一個佇列後面堆數據,另外的一個執行緒從佇列前面取資料處理,基本操作也只有兩個,一個發,一個收,所以,我們定義訊息佇列基底類別為:

class CMsgQueue  
{  
    public:  
        CMsgQueue(const char *pName=NULL);  
        ~CMsgQueue();  
        //revice data from message queue  
        virtual bool recvMsg(unsigned int &m_msg_code,void *&p_msg)=0;  
        //send data to message queue  
        virtual bool sendMsg(unsigned int m_msg_code,void *p_msg)=0;  
        const char * getName(void) const {  
                return msg_queue_name;  
            }        
    private:  
        char *msg_queue_name;  
};


然後記得在COperratingSystemFactory裡加上建立訊息佇列的方法:

class COperatingSystemFactory  
{  
    public:  
        static COperatingSystem *newOperatingSystem();  
        static CCountingSem  *newCountingSem(unsigned int init);  
        static CMutex           *newMutex(const char *pName=NULL);  
        static CMsgQueue     *newMsgQueue(const char *pName=NULL);  
  
};


最後,從CMsginBsgue實現的時候注意一下。


單純的操作雙埠FIFO不行,我們希望是接收訊息的時候如果沒有訊息,線程阻塞在那裡等待訊息直到有訊息到來才接著運行,所以,接收訊息的時候我們用了信號量,阻塞在信號量那裡,發送訊息的時候操作完佇列,發送一個信號量出去。


其次,對於隊列的操作,我們希望是原子性的,不然一個正在收一個正在發就亂了,所以操作隊列的時候我們用互斥鎖來鎖一下,保證基本的原子性。


對應到具體的程序就是


1.為每個訊息隊列申請一個鎖,一個信號量

CLinuxMsgQueue::CLinuxMsgQueue(const char *pName):  
CMsgQueue(pName)  
{  
    p_mutex=COperatingSystemFactory::newMutex("Msg Mutex");  
    p_sem=COperatingSystemFactory::newCountingSem(0);  
}


也是類似的方式進行,這樣,一個最簡單訊息佇列就完成了。如果我們要使用訊息佇列的話,很簡單,在main.cpp中

bool CLinuxMsgQueue::recvMsg(unsigned int &m_msg_code,void *&p_msg)  
{  
    bool result;  
        Elements queue_element;  
    p_sem->Get();  //通过信号量阻塞在这里,有消息到达了才接着往下走  
    p_mutex->Lock();  //锁定,保证原子性  
        //操作队列  
    if (m_queue.empty()) {  
                p_mutex-> UnLock ();  
            return false;    
    }  
    queue_element = m_queue.front();  
    m_queue.pop_front();  
    m_msg_code = queue_element.msg_code;  
    p_msg = queue_element.p_message;  
        //操作队列结束  
    p_mutex->UnLock(); //解除锁定  
        return true;  
}


當要在mainloop中發送訊息的時候,只需要呼叫

int main()  
{  
        //首先,新建一个消息队列  
        CMsgQueue *q=COperatingSystemFactory::newMsgQueue("B to A message Queue");  
        //新建两个线程,TestThread和TestThreadB都是从CThread继承下来的线程类  
    TestThread *a=new TestThread("A");  
    TestThreadB *b=new TestThreadB("B");  
        //将消息队列放到两个线程实体的局部变量中  
    a->setMsgQueue(q);  
    b->setMsgQueue(q);  
        //启动线程  
    a->run();  
    b->run();  
}

github位址:

https://github.com /wyh267/Cplusplus_Thread_Lib

寫在後面的話:

當然,這個程式碼還非常不完整,整個程式碼量也沒有多少行,在這裡,我只是提供一個程式碼框架的方法,作為一個demo給大家參考,如果真的需要實際使用還有很多很多地方需要修改的,github上我的程式碼也不能在生產軟體中實際使用,在實際的專案中,我也實現了一個沒有任何第三方的線程庫,比這個複雜多了,還包括事件處理,等待超時,訊息廣播,訊息訂閱等模組,而且能運行在linux,ecos等多個平台上,基本做到平台無關了,但由於各種原因我也沒辦法將程式碼都公佈出來,這裡所說的這個框架只是項目中線程庫提取出來的非常少的一部分,同樣,也只是提供一種編程的設計思想,後面的東西希望大家各自有各自的發掘和完善,也許你看了以後,會提出更強大、更簡潔的框架。

另外,github上的程式碼我會繼續完善,將其他模組陸續加上,如果大家有興趣也可以跟我一起來完善,我盡量不使用之前實現過的線程庫的程式碼,避免不必要的麻煩。

以上就是C++ 多執行緒框架(3):訊息佇列的內容,更多相關內容請關注PHP中文網(www.php.cn)!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn