Heim > Artikel > Backend-Entwicklung > Analysieren Sie die Python-Implementierung der MQ-Nachrichtenwarteschlange und die Vorteile der Nachrichtenwarteschlange
Die Erhaltungsrolle der Nachrichtenwarteschlange (MQ, Message Queue) bei der Nachrichtendatenübertragung bietet Garantie für die Datenkommunikation und Komfort für die Echtzeitverarbeitung. Hier werfen wir einen Blick auf die MQ-Nachrichtenwarteschlangen-Implementierung von Threads in Python und Nachrichtenwarteschlange Vorteilsanalyse
„Nachrichtenwarteschlange“ ist ein Container, der Nachrichten während ihrer Übertragung speichert. Der Nachrichtenwarteschlangenmanager fungiert als Vermittler bei der Weiterleitung von Nachrichten von seiner Quelle an sein Ziel. Der Hauptzweck einer Warteschlange besteht darin, das Routing bereitzustellen und die Zustellung von Nachrichten zu gewährleisten. Wenn der Empfänger zum Zeitpunkt des Sendens der Nachricht nicht verfügbar ist, behält Message Queue die Nachricht bei, bis sie erfolgreich zugestellt werden kann. Ich glaube, dass die Nachrichtenwarteschlange eine entscheidende Komponente für jede Architektur oder Anwendung ist. Hier sind zehn Gründe:
Beispiel für eine Python-Nachrichtenwarteschlange:
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """没打印一个数字等待1秒,并发打印10个数字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): whileTrue: #消费者端,从队列中获取num num = self.queue.get() print "i'm num %s"%(num) time.sleep(1) #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号 self.queue.task_done() start = time.time() def main(): #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发 for i in range(10): t = ThreadNum(queue) t.setDaemon(True) t.start() #往队列中填错数据 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
Laufergebnisse:
i'm num 0 i'm num 1 i'm num 2 i'm num 3 i'm num 4 i'm num 5 i'm num 6 i'm num 7 i'm num 8 i'm num 9 Elapsed Time: 1.01399993896
Interpretation:
Die spezifischen Arbeitsschritte werden wie folgt beschrieben:
1, erstellen Sie eine Queue.Queue () und füllen Sie sie aus es mit Daten.
2, übergeben Sie die gefüllte Dateninstanz an die Thread-Klasse, die durch Erben von threading.Thread erstellt wird.
3. Generieren Sie einen Daemon-Thread-Pool.
4. Nehmen Sie jedes Mal ein Element aus der Warteschlange und verwenden Sie die Daten- und Ausführungsmethode in diesem Thread, um die entsprechende Arbeit auszuführen.
5. Verwenden Sie nach Abschluss dieser Arbeit die Funktion queue.task_done(), um ein Signal an die Warteschlange zu senden, dass die Aufgabe abgeschlossen wurde.
6. Das Ausführen einer Verknüpfungsoperation für die Warteschlange bedeutet tatsächlich, zu warten, bis die Warteschlange leer ist, bevor das Hauptprogramm beendet wird.
Bei Verwendung dieses Modus ist Folgendes zu beachten: Wenn Sie den Daemon-Thread auf „True“ setzen, wird das Programm nach der Ausführung automatisch beendet. Der Vorteil besteht darin, dass Sie einen Join-Vorgang für die Warteschlange durchführen oder warten können, bis die Warteschlange leer ist, bevor Sie sie verlassen.
Die sogenannten Mehrfachwarteschlangen, die Ausgabe einer Warteschlange kann als Eingabe einer anderen Warteschlange verwendet werden
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() out_queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): whileTrue: #从队列中取消息 num = self.queue.get() bkeep = num #将bkeep放入队列中 self.out_queue.put(bkeep) #signals to queue job is done self.queue.task_done() class PrintLove(threading.Thread): def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): whileTrue: #从队列中获取消息并赋值给bkeep bkeep = self.out_queue.get() keke = "I love " + str(bkeep) print keke, print self.getName() time.sleep(1) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #populate queue with data for num in range(10): queue.put(num) #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadNum(queue, out_queue) t.setDaemon(True) t.start() for i in range(5): pl = PrintLove(out_queue) pl.setDaemon(True) pl.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
Laufergebnisse:
I love 0 Thread-6 I love 1 Thread-7 I love 2 Thread-8 I love 3 Thread-9 I love 4 Thread-10 I love 5 Thread-7 I love 6 Thread-6 I love 7 Thread-9 I love 8 Thread-8 I love 9 Thread-10 Elapsed Time: 2.00300002098
Interpretation:
ThreadNum-Klassenworkflow
Warteschlange definieren--->Threading erben---->Initialisieren queue-- -->Definieren Sie die Ausführungsfunktion--->holen Sie die Daten in der Warteschlange---->Verarbeiten Sie die Daten---->stellen Sie die Daten in eine andere Warteschlange-->Senden Sie ein Signal in die Warteschlange, um der Warteschlange mitzuteilen, dass die Verarbeitung abgeschlossen ist
Hauptfunktionsworkflow:
---> Daten in die benutzerdefinierte Warteschlange werfen
---> Die for-Schleife soll gestartet werden. Anzahl der Threads----> Instanziieren Sie die ThreadNum-Klasse. Starten Sie den Thread und richten Sie den Guard ein. > Die for-Schleife bestimmt die Anzahl der gestarteten Threads ----> PrintLove-Klasse instanziieren ---> Warteschlange, die verarbeitet werden soll, und führen Sie dann den Join aus. Das heißt, beenden Sie das Hauptprogramm.
Nachdem wir die allgemeine Implementierung von MQ verstanden haben, fassen wir die Vorteile von Nachrichtenwarteschlangen zusammen:
1. Entkopplung
Sagen Sie zu Beginn des Projekts die zukünftigen Projektrisiken voraus Es ist äußerst schwierig, das zu finden, was benötigt wird. Die Nachrichtenwarteschlange fügt mitten im Verarbeitungsprozess eine implizite, datenbasierte Schnittstellenschicht ein, und die Verarbeitungsprozesse auf beiden Seiten müssen diese Schnittstelle implementieren. Dies ermöglicht es Ihnen, die Prozesse auf beiden Seiten unabhängig voneinander zu erweitern oder zu ändern, solange sie die gleichen Schnittstellenbeschränkungen einhalten.
Manchmal schlägt der Prozess bei der Datenverarbeitung fehl. Sofern die Daten nicht dauerhaft gespeichert werden, gehen sie für immer verloren. Message Queuing vermeidet das Risiko eines Datenverlusts, indem Daten so lange gespeichert werden, bis sie vollständig verarbeitet wurden. Im „Insert-Get-Delete“-Paradigma, das von vielen Nachrichtenwarteschlangen verwendet wird, muss Ihr Verarbeitungsprozess vor dem Löschen einer Nachricht aus der Warteschlange klar angeben, dass die Nachricht verarbeitet wurde, um sicherzustellen, dass Ihre Daten sicher sind. Speichern Sie sie, bis Sie sie löschen. Ich bin damit fertig.
Da die Nachrichtenwarteschlange Ihre Verarbeitung entkoppelt, ist es einfach, die Häufigkeit der Nachrichteneinreihung und -verarbeitung zu erhöhen; Es besteht keine Notwendigkeit, den Code zu ändern oder Parameter anzupassen. Das Erweitern ist so einfach wie das Hochdrehen des Netzschalters.
Wenn sich Ihre Anwendung auf der Homepage von Hacker News befindet, werden Sie feststellen, dass der Datenverkehr ein ungewöhnliches Niveau erreicht hat. Ihre Anwendung muss weiterhin funktionieren, wenn die Anzahl der Besuche dramatisch ansteigt. Solche Datenverkehrsspitzen sind jedoch ungewöhnlich. Es wäre eine enorme Verschwendung, Ressourcen in den Standby-Modus zu investieren, wenn man davon ausgeht, dass solche Spitzenbesuche bewältigt werden können. Durch die Verwendung von Nachrichtenwarteschlangen können kritische Komponenten einem erhöhten Zugriffsdruck standhalten, ohne aufgrund überlasteter Anforderungen vollständig zusammenzubrechen. Weitere Informationen hierzu finden Sie in unserem Blogbeitrag zu Spitzenverarbeitungskapazitäten.
Wenn einige Komponenten des Systems ausfallen, hat dies keine Auswirkungen auf das gesamte System. Die Nachrichtenwarteschlange reduziert die Kopplung zwischen Prozessen. Selbst wenn ein Prozess, der Nachrichten verarbeitet, hängen bleibt, können die zur Warteschlange hinzugefügten Nachrichten nach der Systemwiederherstellung weiterhin verarbeitet werden. Die Möglichkeit, Anfragen erneut zu versuchen oder zurückzustellen, macht oft den Unterschied zwischen einem leicht unbequemen Benutzer und einem frustrierten Benutzer aus.
Der von der Nachrichtenwarteschlange bereitgestellte Redundanzmechanismus stellt sicher, dass die Nachricht tatsächlich verarbeitet werden kann, solange ein Prozess die Warteschlange liest. Auf dieser Basis gewährt IronMQ eine „Lieferung nur einmal“-Garantie. Unabhängig davon, wie viele Prozesse Daten aus der Warteschlange empfangen, kann jede Nachricht nur einmal verarbeitet werden. Dies ist möglich, weil beim Abrufen einer Nachricht die Nachricht einfach „abonniert“ und sie vorübergehend aus der Warteschlange entfernt wird. Sofern der Client nicht ausdrücklich angibt, dass er die Verarbeitung der Nachricht abgeschlossen hat, wird die Nachricht zurück in die Warteschlange gestellt und kann nach einer konfigurierbaren Zeitspanne erneut verarbeitet werden.
In vielen Fällen ist die Reihenfolge, in der Daten verarbeitet werden, wichtig. Die Nachrichtenwarteschlange ist von Natur aus sortiert und kann garantieren, dass Daten in einer bestimmten Reihenfolge verarbeitet werden. IronMO stellt sicher, dass Nachrichten in der FIFO-Reihenfolge (First In, First Out) verarbeitet werden, sodass die Position der Nachrichten in der Warteschlange die Position ist, von der sie abgerufen wurden.
In jedem wichtigen System gibt es Elemente, die unterschiedliche Verarbeitungszeiten erfordern. Das Laden eines Bildes nimmt beispielsweise weniger Zeit in Anspruch als das Anwenden eines Filters. Nachrichtenwarteschlangen verwenden eine Pufferschicht, um die effiziente Ausführung von Aufgaben zu ermöglichen – Schreibvorgänge in die Warteschlange werden so schnell wie möglich verarbeitet, ohne durch die vorbereitende Verarbeitung zum Lesen aus der Warteschlange eingeschränkt zu werden. Diese Pufferung hilft, die Geschwindigkeit zu steuern und zu optimieren, mit der Daten durch das System fließen.
In einem verteilten System ist es eine große Herausforderung, einen Gesamteindruck darüber zu bekommen, wie lange Benutzervorgänge dauern werden und warum. Nachrichtenserien können dabei helfen, leistungsschwache Prozesse oder Bereiche anhand der Häufigkeit der Nachrichtenverarbeitung zu identifizieren, in denen der Datenfluss nicht ausreichend optimiert ist.
Oft möchte oder muss man Nachrichten nicht sofort verarbeiten. Nachrichtenwarteschlangen bieten einen asynchronen Verarbeitungsmechanismus, der es Ihnen ermöglicht, eine Nachricht in die Warteschlange zu stellen, sie jedoch nicht sofort zu verarbeiten. Sie können so viele Nachrichten in die Warteschlange stellen, wie Sie möchten, und diese dann bearbeiten, wenn Ihnen danach ist.
Das obige ist der detaillierte Inhalt vonAnalysieren Sie die Python-Implementierung der MQ-Nachrichtenwarteschlange und die Vorteile der Nachrichtenwarteschlange. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!