Heim  >  Artikel  >  Backend-Entwicklung  >  C# implementiert eine asynchrone Nachrichtenwarteschlange

C# implementiert eine asynchrone Nachrichtenwarteschlange

黄舟
黄舟Original
2016-12-27 14:31:431955Durchsuche

Nachrichtenwarteschlange

Nachrichtenwarteschlange (englisch: Message queue) ist eine Methode zur Kommunikation zwischen Prozessen oder zur Kommunikation zwischen verschiedenen Threads desselben Prozesses a Eine Reihe von Eingaben, normalerweise vom Benutzer. Die Nachrichtenwarteschlange stellt ein asynchrones Kommunikationsprotokoll bereit. Jeder Datensatz in der Warteschlange enthält detaillierte Informationen, einschließlich des Zeitpunkts des Auftretens, des Typs des Eingabegeräts und spezifischer Eingabeparameter. Das heißt, der Absender und Empfänger der Nachricht sind nicht erforderlich um gleichzeitig mit der Nachrichtenwarteschlange zu interagieren. Die Nachricht bleibt in der Warteschlange, bis der Empfänger sie abruft.

Um es einfach auszudrücken: Die Warteschlange speichert die Befehle, die wir verarbeiten müssen, erhält die Verarbeitungsergebnisse jedoch nicht rechtzeitig

Implementierung

Tatsächlich werden Nachrichtenwarteschlangen häufig in verknüpften Listenstrukturen gespeichert. Prozesse mit der Berechtigung können Nachrichten aus der Nachrichtenwarteschlange schreiben oder lesen.

Derzeit gibt es viele Open-Source-Implementierungen von Nachrichtenwarteschlangen, darunter JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, Apache Qpid und HTTPSQS.

Vorteile, Nachteile

Die Nachrichtenwarteschlange selbst ist asynchron, was es dem Empfänger ermöglicht, die Nachricht lange nach dem Senden der Nachricht abzurufen, was sich von den meisten Kommunikationsprotokollen unterscheidet. Beispielsweise ist das im WWW verwendete HTTP-Protokoll synchron, da der Client nach einer Anfrage auf die Antwort des Servers warten muss. Es gibt jedoch viele Situationen, in denen wir asynchrone Kommunikationsprotokolle benötigen. Beispielsweise benachrichtigt ein Prozess einen anderen Prozess, dass ein Ereignis aufgetreten ist, muss jedoch nicht auf eine Antwort warten. Die asynchrone Natur der Nachrichtenwarteschlange hat jedoch auch einen Nachteil: Der Empfänger muss die Nachrichtenwarteschlange abfragen, um die neueste Nachricht zu empfangen.

Im Vergleich zu Signalen können Nachrichtenwarteschlangen mehr Informationen übertragen. Im Vergleich zu Pipes stellen Nachrichtenwarteschlangen formatierte Daten bereit, was die Arbeitsbelastung des Entwicklers verringern kann. Für Nachrichtenwarteschlangen gelten jedoch weiterhin Größenbeschränkungen.

Warteschlangennachrichten lesen

Es gibt hauptsächlich zwei Arten (1) Push durch den Server; (2) Pull durch den Client

Pull: Hauptsächlich fragt der Client regelmäßig ab Nachrichtenverarbeitung erhalten;

Push: Abonnenten durch Ereignisabonnement aktiv zur Verarbeitung benachrichtigen

Einfache Speicherung wird durch eine speicherverknüpfte Liste erreicht; DB, wie Redis; kann auch in lokalen Dateien gespeichert werden.

So stellen Sie die Konsistenz der asynchronen Verarbeitung sicher

Obwohl der Hauptzweck der Warteschlange darin besteht, Nachrichten zu speichern, asynchronisiert sie auch Aufrufe und Umsetzungen. Wenn Sie jedoch eine Konsistenz bei der Verarbeitung von Nachrichten erreichen möchten, besteht eine gute Möglichkeit darin, die Reihenfolge der Geschäftsverarbeitung zu unterscheiden, z. B. den Betrieb der Master-Slave-Datenbank. Der Master ist für das Schreiben und der Slave für das Lesen verantwortlich Wir haben keine Chance, die gewünschten Ergebnisse durch das Lesen der Datenbank sofort nach dem Schreiben zu erhalten. Gleichzeitig müssen wir Zwischenzustände verwenden, um die Ergebnisse des Anrufs gleichzeitig zu verarbeiten. Die „Ausnahmemeldung“ bleibt bis zum nächsten Vorgang bestehen.

Der obige Code

Nachrichtenunterstützungs-Kernwarteschlange einrichten

Ereignisverarbeitung
{    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);
 
    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();
 
        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }
 
        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }
 
            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}

public const string OrderCodePrefix = "P";        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;
 
            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }
Kann je nach Unternehmen personalisiert werden

Nachrichten über Proxy an die Warteschlange anhängen

Der Kunde ruft an
public class OrderServiceProxy:IOrderService    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }

Dies erfüllt die Anforderungen der Ereignisbindung und des Auslösens einer personalisierten Verarbeitung und erreicht gleichzeitig die Asynchronität von Nachrichten. Zum Zwecke der Transformation hoffe ich, es detaillierter zu erweitern und in späteren Projekten zu verwenden.
OrderService orderService = new OrderService();            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;
 
            var orders = new List<Order>() {
                new Order(){OrderCode="P001"},
                new Order(){OrderCode="P002"},
                new Order(){OrderCode="B003"}
            };
 
            OrderServiceProxy proxy = new OrderServiceProxy();
            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));
 
            Console.ReadLine();

Das Obige ist der Inhalt von C# zur Implementierung einer asynchronen Nachrichtenwarteschlange. Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website (www.php.cn)!


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:C#-StackNächster Artikel:C#-Stack