Home > Article > Backend Development > C# implements asynchronous message queue
Message Queue
Message queue (English: Message queue) is a method of inter-process communication or communication between different threads of the same process. The software queue is used to process a A series of inputs, usually from the user. The message queue provides an asynchronous communication protocol. Each record in the queue contains detailed information, including the time of occurrence, the type of input device, and specific input parameters. That is to say: the sender and receiver of the message do not Need to interact with the message queue at the same time. The message remains in the queue until the recipient retrieves it.
Simply put, the queue stores the Commands we need to process but does not get the processing results in time;
Implementation
In fact, message queues are often stored in linked list structures. Processes with permission can write or read messages from the message queue.
Currently, there are many open source implementations of message queues, including JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, Apache Qpid and HTTPSQS.
Advantages, Disadvantages
The message queue itself is asynchronous, which allows the receiver to retrieve the message long after the message is sent, which is different from most communication protocols. For example, the HTTP protocol used in the WWW is synchronous because the client must wait for the server to respond after making a request. However, there are many situations where we need asynchronous communication protocols. For example, one process notifies another process that an event has occurred, but does not need to wait for a response. However, the asynchronous nature of the message queue also creates a disadvantage, that is, the receiver must poll the message queue to receive the latest message.
Compared with signals, message queues can transmit more information. Compared with pipes, message queues provide formatted data, which can reduce developer workload. But message queues still have size limits.
Read queue messages
There are mainly two types (1) server push; (2) client pull;
pull: mainly the client polls regularly to get Message processing;
Push: Actively notify subscribers through event subscription for processing;
Storage of messages
Simple storage is achieved through a memory linked list; you can also use DB, such as Redis; can also be persisted to local files;
How to ensure the consistency of asynchronous processing
Although the main purpose of the queue is to store messages, it also asynchronousizes calls and implementations. But if you want to achieve consistency in processing messages, a good way is to distinguish the order of business processing, such as operating the master-slave DB, the master is responsible for writing, and the slave is responsible for reading. We have no chance to get the results you want from reading the database immediately after writing; At the same time, we need to use intermediate states. When multiple intermediate states meet the call results at the same time, they will be processed at the business time. Otherwise, the "exception message" will be persisted until the next operation;
Update code
Establish message support core queue
{ public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message); public class MessageQueue:Queue<BaseMessage> { public static MessageQueue GlobalQueue = new MessageQueue(); private Timer timer = new Timer(); public MessageQueue() { this.timer.Interval = 5000; this.timer.Elapsed += Notify; this.timer.Enabled = true; } private void Notify(object sender, ElapsedEventArgs e) { lock (this) { if (this.Count > 0) { //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue()); var message = this.Dequeue(); this.messageNotifyEvent(message); } } } private MessageQueueEventNotifyHandler messageNotifyEvent; public event MessageQueueEventNotifyHandler MessageNotifyEvent { add { this.messageNotifyEvent += value; } remove { if (this.messageNotifyEvent != null) { this.messageNotifyEvent -= value; } } } } }
Event processing
public const string OrderCodePrefix = "P"; public void Submit(Message.BaseMessage message) { Order order = message.Body as Order; if (order.OrderCode.StartsWith(OrderCodePrefix)) { System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode); } else { System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode); } }
Can be personalized according to specific business;
Append messages to the queue through Proxy
public class OrderServiceProxy:IOrderService { public void Submit(Message.BaseMessage message) { MessageQueue.MessageQueue.GlobalQueue.Enqueue(message); } }
Client call
OrderService orderService = new OrderService(); MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit; var orders = new List<Order>() { new Order(){OrderCode="P001"}, new Order(){OrderCode="P002"}, new Order(){OrderCode="B003"} }; OrderServiceProxy proxy = new OrderServiceProxy(); orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order})); Console.ReadLine();
This meets the needs of event binding and triggering personalized processing, and at the same time achieves the purpose of message asynchronousization. I hope it will be more detailed. Expand and use it in later projects.
The above is the content of C# to implement asynchronous message queue. For more related content, please pay attention to the PHP Chinese website (www.php.cn)!