Home  >  Article  >  Backend Development  >  C# implements asynchronous message queue

C# implements asynchronous message queue

黄舟
黄舟Original
2016-12-27 14:31:432002browse

Message Queue

Message queue (English: Message queue) is a method of inter-process communication or communication between different threads of the same process. The software queue is used to process a A series of inputs, usually from the user. The message queue provides an asynchronous communication protocol. Each record in the queue contains detailed information, including the time of occurrence, the type of input device, and specific input parameters. That is to say: the sender and receiver of the message do not Need to interact with the message queue at the same time. The message remains in the queue until the recipient retrieves it.

Simply put, the queue stores the Commands we need to process but does not get the processing results in time;

Implementation

In fact, message queues are often stored in linked list structures. Processes with permission can write or read messages from the message queue.

Currently, there are many open source implementations of message queues, including JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, Apache Qpid and HTTPSQS.

Advantages, Disadvantages

The message queue itself is asynchronous, which allows the receiver to retrieve the message long after the message is sent, which is different from most communication protocols. For example, the HTTP protocol used in the WWW is synchronous because the client must wait for the server to respond after making a request. However, there are many situations where we need asynchronous communication protocols. For example, one process notifies another process that an event has occurred, but does not need to wait for a response. However, the asynchronous nature of the message queue also creates a disadvantage, that is, the receiver must poll the message queue to receive the latest message.

Compared with signals, message queues can transmit more information. Compared with pipes, message queues provide formatted data, which can reduce developer workload. But message queues still have size limits.

Read queue messages

There are mainly two types (1) server push; (2) client pull;

pull: mainly the client polls regularly to get Message processing;

Push: Actively notify subscribers through event subscription for processing;

Storage of messages

Simple storage is achieved through a memory linked list; you can also use DB, such as Redis; can also be persisted to local files;

How to ensure the consistency of asynchronous processing

Although the main purpose of the queue is to store messages, it also asynchronousizes calls and implementations. But if you want to achieve consistency in processing messages, a good way is to distinguish the order of business processing, such as operating the master-slave DB, the master is responsible for writing, and the slave is responsible for reading. We have no chance to get the results you want from reading the database immediately after writing; At the same time, we need to use intermediate states. When multiple intermediate states meet the call results at the same time, they will be processed at the business time. Otherwise, the "exception message" will be persisted until the next operation;

Update code

Establish message support core queue

{    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);
 
    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();
 
        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }
 
        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }
 
            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}

Event processing

public const string OrderCodePrefix = "P";        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;
 
            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }

Can be personalized according to specific business;

Append messages to the queue through Proxy

public class OrderServiceProxy:IOrderService    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }

Client call

OrderService orderService = new OrderService();            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;
 
            var orders = new List<Order>() {
                new Order(){OrderCode="P001"},
                new Order(){OrderCode="P002"},
                new Order(){OrderCode="B003"}
            };
 
            OrderServiceProxy proxy = new OrderServiceProxy();
            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));
 
            Console.ReadLine();

This meets the needs of event binding and triggering personalized processing, and at the same time achieves the purpose of message asynchronousization. I hope it will be more detailed. Expand and use it in later projects.

The above is the content of C# to implement asynchronous message queue. For more related content, please pay attention to the PHP Chinese website (www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:C# StackNext article:C# Stack