Heim >Backend-Entwicklung >Python-Tutorial >Messaging in verteilten Systemen mit ZeroMQ
Lassen Sie uns Python verwenden, um die verschiedenen Nachrichtenmuster zu entwickeln.
Sie müssen sich das folgende Video ansehen, um den Schritt-für-Schritt-Befehlen zu folgen.
Nehmen Sie sich Zeit; Stellen Sie sicher, dass Sie die Befehle noch einmal überprüfen, bevor Sie sie ausführen.
Ich führe dieses Tutorial auf meiner GCP-VM aus, kann es aber auch gerne lokal ausführen ✅
Dieses Tutorial stellt die Konzepte von Sockets in Python3 mit ZeroMQ vor. ZeroMQ ist eine einfache Möglichkeit, Sockets zu entwickeln, damit verteilte Prozesse durch das Senden von Nachrichten miteinander kommunizieren können.
Die Nachrichtenmuster, die wir heute untersuchen werden, sind die folgenden:
? Hinweis: Das Arbeiten mit Sockets kann schwierig sein. Das wiederholte Ausführen desselben Codes unter Verwendung derselben Portnummer/dieselben Sockets kann dazu führen, dass die Verbindung „hängt“ (der Server scheint zu laufen, aber es kann keine Verbindungen akzeptieren). Dies liegt daran, dass wir die vorherigen Verbindungen nicht korrekt geschlossen und zerstört haben.
Der beste Weg, dies zu beheben, besteht darin, den Socket zu schließen und den ZeroMQ-Kontext zu zerstören. Weitere Einzelheiten finden Sie unter Try – Catch-Blöcke von Phase 2 und Phase 3.
In diesem Tutorial können solche Probleme auftreten, z. B. wenn derselbe Server mehrmals am selben Port ausgeführt wird. Wenn Sie auf hängende Probleme stoßen, wird empfohlen, den Python-Prozess abzubrechen, die TCP-Portnummer zu bereinigen und den Server erneut auszuführen (siehe Schritt 11).
Beginnen wir mit der Erstellung einer neuen VM, dann installieren wir Python3.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Geben Sie „Y“ ein, wenn Sie dazu aufgefordert werden.
Viele Anwendungen bestehen heutzutage aus Komponenten, die sich über Netzwerke erstrecken, daher ist die Nachrichtenübermittlung unerlässlich. Heute werden wir TCP für die Nachrichtenübertragung verwenden.
Sie können über VSC auf Ihre VM zugreifen oder die Befehle über SSH ausführen und Dateien mit Pico bearbeiten. In meinem Fall verwende ich SSH.
? Stellen Sie sicher, dass Sie den Code sorgfältig kopieren.
Wir müssen unseren ersten ZeroMQ-Server erstellen. Der Server ermöglicht die Bindung mit jeweils nur einem Client.
Erstellen Sie eine neue Datei mit dem Namen „pair-server.py“ und geben Sie dann den folgenden Code ein.
Der Code erstellt einen neuen Socket mithilfe des Musters zmq.PAIR und bindet dann den Server an einen bestimmten IP-Port (den wir bereits in GCP geöffnet haben). Beachten Sie, dass der Server nicht aufhört zu laufen, bis wir ihn stoppen.
Schauen Sie sich die Kommentare an, um zu verstehen, wie das funktioniert.
Stellen Sie sicher, dass Sie das ; ändern. das ist die interne IP-Adresse der GCP-VM; Der Client-Port sollte mit dem Server identisch sein.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Führen Sie den Server noch nicht aus, lassen Sie uns zuerst den Client erstellen.
Erstellen Sie den Kunden und nehmen Sie sich eine Minute Zeit, um die Kommentare zu prüfen. Ich werde es „pair-client.py“ nennen.
Stellen Sie sicher, dass Sie das ; ändern. Der Port sollte derselbe sein wie im Server.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Wir benötigen zwei Terminalfenster, um das PAIR-Beispiel auszuführen. Wir werden den Server in einem Fenster und den Client im anderen ausführen. Führen Sie es nun wie folgt aus.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Untersuchen Sie die Ausgabe. Wir haben gerade einen neuen PAIR-Socket erstellt.
Wir müssen die TCP-Verbindung trennen, bevor wir sie erneut ausführen. Verwenden Sie dazu den folgenden Befehl.
$ python3 pair-server.py
? Hinweise:
Wir können nur ein PAAR gleichzeitig ausführen, das bedeutet, dass wir nicht mehrere Clients haben können. Denken Sie daran, dass es sich um ein PAAR handelt. Der erste Client sperrt den Socket .
Wenn wir den Server einmal und den Client zweimal ausführen, bleibt der zweite Client „hängen“, was bedeutet, dass der zweite Client darauf wartet, dass sich ein neuer Server verbindet.
Wenn wir das Paar mehr als einmal ausführen möchten, müssen wir den Server beenden und die TCP-Verbindung löschen.
PAARE sind ideal, wenn ein Client exklusiven Zugriff auf einen Server benötigt.
Wir können mehrere Server für mehrere Clients als PAARE haben, aber wir müssen unterschiedliche PORT-Nummern für die Verbindungen verwenden.
Jede Phase ist unabhängig voneinander. Stoppen Sie daher den Server, löschen Sie die TCP-Ports und fahren Sie mit der nächsten Phase fort.
Lassen Sie uns eine Client-Server-Verbindung erstellen, bei der mehrere Clients eine Verbindung zu einem einzelnen Server herstellen. Dies ist das beliebteste Nachrichtenmuster.
$ python3 pair-client.py
Jetzt werden wir zwei Clients entwickeln, die hinsichtlich ihrer Funktionalität identisch sein werden.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
Lassen Sie uns eine Kopie dieses Clients erstellen und diese entsprechend bearbeiten. Führen Sie den folgenden Befehl aus, um eine neue Kopie zu erstellen.
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Bearbeiten Sie dann req-client2.py und ändern Sie Client 1 in Client 2.
Bearbeiten wir die Druck- und Socket-Nachrichten (Zeilen 8 und 9)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Um dieses Beispiel auszuführen, benötigen wir drei Terminalfenster, eines für den Server und zwei für die Clients. Führen Sie im ersten Terminal Folgendes aus:
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Überprüfen Sie die Ausgabe der Fenster. Wir haben gerade zwei Clients erstellt, die mit einem Server kommunizieren. Sie können so viele Clients haben, wie Sie möchten. Sie müssen Clients erstellen, auch mit unterschiedlichen Funktionalitäten, die eine Verbindung zu einem Server herstellen.
? Hinweise:
Client – Server ist das am weitesten verbreitete Muster. Wir haben es bereits in Klasse 1 verwendet, als wir den Apache HTTP-Server installiert und ausgeführt haben.
Stoppen Sie den Server und bereinigen Sie TCP-Port 5555
- Töte den Server:
bash
$ sudo Fuser -k 5555/tcp
Das Publish-Subscribe-Muster ist eine sehr verbreitete Methode, um die Übertragung von Daten an viele Clients zu steuern, die einen Kontext abonniert haben, und zwar so, dass Server Daten an einen oder mehrere Clients senden.
$ python3 pair-server.py
Lassen Sie uns zunächst ein einfaches Beispiel erstellen.
$ python3 pair-client.py
Lassen Sie uns eine neue Datei erstellen und sie pub_server.py nennen.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Erstellen Sie eine neue Datei pub_client.py.
* Das Skript akzeptiert drei Argumente von der Befehlszeile (das sind die IP und die beiden Ports).
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Wir sind bereit, unsere Pub-Sub-Anwendung auszuführen! Wir benötigen drei Terminalfenster. Im ersten Terminal ausführen:
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Lassen Sie uns den Client ausführen, um eine Verbindung herzustellen und Daten nach Postleitzahl zu abonnieren, z. B. 10001 (NYC). Denken Sie daran, dass das Client-Skript beide Serverinstanzen abonniert. Führen Sie den nächsten Befehl aus:
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
Mit Push/Pull-Sockets können Sie Nachrichten an mehrere Mitarbeiter verteilen, die in einer Pipeline angeordnet sind. Dies ist sehr nützlich, um Code parallel auszuführen. Ein Push-Socket verteilt Nachrichten gleichmäßig an seine Pull-Clients und die Clients senden eine Antwort an einen anderen Server, den sogenannten Collector.
Dies entspricht dem Producer/Consumer-Modell, aber die vom Consumer berechneten Ergebnisse werden nicht stromaufwärts, sondern stromabwärts an einen anderen Pull-/Consumer-Socket gesendet.
Wir werden die folgende Funktionalität implementieren.
Der Produzent sendet Zufallszahlen von 0 bis 10 an die Verbraucher.
Zwei Instanzen desselben Verbrauchers ziehen die Zahlen und führen eine schwere Aufgabe aus.
Die Aufgabe könnte jede schwere Berechnung sein, z. B. eine Matrixmultiplikation.
Der Einfachheit halber gibt unsere „schwere Aufgabe“ einfach dieselbe Zahl zurück.
Die Verbraucher werden die einzelnen Ergebnisse (Berechnungen schwerer Aufgaben) an einen Ergebnissammler senden, der die Ergebnisse aggregiert.
Der Einfachheit halber zieht eine Instanz des Ergebnissammlers die Ergebnisse und berechnet die Teilsumme jedes Verbrauchers. Bei Bedarf können wir die beiden Teilsummen problemlos summieren.
Sehen wir uns ein einfaches Beispiel an.
Dieses Beispiel zeigt das Potenzial der verteilten Verarbeitung für die Parallelverarbeitung.
Lassen Sie uns zunächst den Produzenten „produzent.py“ erstellen, der auf Port 5555 läuft. Stellen Sie sicher, dass Sie Ihre .
anpassen
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Dann erstellen Sie die Datei „consumer.py“ wie folgt. Vergessen Sie nicht, die beiden s im Code zu ändern.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Lassen Sie uns zum Schluss noch die Collector.py entwickeln und die .
erneut ändern
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Stellen Sie sicher, dass kein Einrückungsfehler vorliegt!
$ python3 pair-server.py
Zuerst müssen wir Collector.py ausführen. Der Collector wartet auf die Datenerfassung, bis wir den Producer starten.
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Gut gemacht! ? Sie haben ZeroMQ verwendet, um Nachrichtenmuster zu entwickeln!
Das obige ist der detaillierte Inhalt vonMessaging in verteilten Systemen mit ZeroMQ. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!