首頁 >後端開發 >C#.Net教程 >C#實作非同步訊息佇列

C#實作非同步訊息佇列

黄舟
黄舟原創
2016-12-27 14:31:432110瀏覽

訊息佇列

 

訊息佇列(英文:Message queue)是一種進程間通訊或同一進程的不同執行緒間的通訊方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。訊息佇列提供了非同步的通訊協議,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:訊息的發送者和接收者不需要同時與訊息隊列互交。訊息會保存在佇列中,直到接收者取回它。

 

簡單的說隊列就是貯存了我們需要處理的Command但是並不是及時的拿到其處理結果;

 

實現

 

實際上,訊息隊列中通常保存在鍊錶。擁有權限的進程可以寫入或讀取訊息佇列到訊息佇列中。

 

目前,有許多訊息佇列有許多開源的實現,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、Apache Qpid和HTTPSQS。

優點,缺點

訊息佇列本身是異步的,它允許接收者在訊息發送很長時間後再取回訊息,這和大多數通訊協定是不同的。例如WWW中使用的HTTP協定是同步的,因為客戶端在發出請求後必須等待伺服器回應。然而,很多情況下我們需要異步的通訊協定。例如,一個進程通知另一個進程發生了一個事件,但不需要等待回應。但訊息隊列的非同步特點,也造成了一個缺點,就是接收者必須輪詢訊息隊列,才能收到最近的訊息。

和訊號相比,訊息佇列能夠傳遞更多的訊息。與管道相比,訊息佇列提供了有格式的數據,這可以減少開發人員的工作量。但訊息隊列仍然有大小限制。

讀取佇列訊息

主要有兩種(1)服務端的推;(2)客戶端的拉;

拉:主要是客戶端定時輪詢拿走訊息處理;

推:透過事件訂閱方式主動通知訂閱者進行處理;

訊息的貯存

簡單的是透過記憶體鍊錶實現貯存;也可以藉助DB,例如Redis;還可以持久到本地文件中;

如何保證非同步處理的一致性

儘管佇列主要目的是實現訊息貯存,同時將呼叫與實現非同步化。但如果想達到處理訊息一致性,好的方式是區別業務處理順序,例如操作主從DB,主負責寫,從負責讀,我們沒有機會在寫之後立刻從讀資料庫拿到你想要的結果;同時我們需要藉助中間狀態,當多個中間狀態同時符合呼叫結果才到到業務時間被處理,否則將「異常訊息」持久化,待下次操作;

上程式碼

建立訊息對立核心隊列

{    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);
 
    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();
 
        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }
 
        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }
 
            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}

事件處理

public const string OrderCodePrefix = "P";        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;
 
            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }

可依據具體業務進行個性化處理;

 

透過Proxy向隊列追加訊息

public class OrderServiceProxy:IOrderService    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }

客戶端綁定,同時達到了訊息異步化的目的,希望更細緻的拓展用到後期的專案中。

 以上就是C#實作非同步訊息佇列的內容,更多相關內容請關注PHP中文網(www.php.cn)!


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn