Maison >développement back-end >Tutoriel Python >Messagerie dans les systèmes distribués utilisant ZeroMQ
Utilisons Python pour développer les différents modèles de messagerie.
Vous devrez regarder la vidéo suivante pour suivre les commandes étape par étape.
Prenez votre temps ; assurez-vous de revérifier les commandes avant de les exécuter.
J'exécute ce tutoriel sur ma VM GCP, mais n'hésitez pas à l'exécuter localement ✅
Ce tutoriel présente les concepts de sockets en Python3 à l'aide de ZeroMQ. ZeroMQ est un moyen simple de développer des sockets pour permettre aux processus distribués de communiquer entre eux en envoyant des messages.
Les modèles de messagerie que nous examinerons aujourd'hui sont les suivants :
? Remarque : Travailler avec des Sockets peut être délicat, exécuter encore et encore le même code, en utilisant le même numéro de port/même socket, pourrait conduire à une connexion qui « se bloque » (le serveur semble être en cours d'exécution, mais il ne peut pas accepter les connexions). Cela se produit parce que nous n’avons pas fermé et détruit correctement les connexions précédentes.
Le moyen le plus approprié de résoudre ce problème est de fermer le socket et de détruire le contexte ZeroMQ. Reportez-vous aux blocs try – catch de la phase 2 et de la phase 3 pour plus de détails.
Dans ce didacticiel, vous pourriez rencontrer de tels problèmes, par exemple en exécutant plusieurs fois le même serveur sur le même port. Si vous rencontrez des problèmes de blocage, il est conseillé d'arrêter le processus Python, de nettoyer le numéro de port TCP et de réexécuter le serveur (voir étape 11).
Commençons par créer une nouvelle VM, puis nous installerons Python3.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Tapez : Y lorsque vous y êtes invité.
De nos jours, de nombreuses applications sont constituées de composants qui s'étendent sur plusieurs réseaux, la messagerie est donc essentielle. Aujourd'hui, nous utiliserons TCP pour le transfert de messages.
Vous pouvez accéder à votre VM en utilisant VSC, ou vous pouvez exécuter les commandes en utilisant SSH et éditer des fichiers avec pico, dans mon cas j'utiliserai SSH.
? Assurez-vous de copier soigneusement le code.
Nous devrons créer notre premier serveur ZeroMQ, le serveur permettra la liaison avec un seul client à la fois.
Créez un nouveau fichier appelé pair-server.py, puis entrez le code suivant.
Le code crée un nouveau socket en utilisant le modèle zmq.PAIR, puis lie le serveur à un port IP particulier (que nous avons déjà ouvert dans GCP). Notez que le serveur ne cessera pas de fonctionner tant que nous ne l'arrêterons pas.
Jetez un œil aux commentaires pour comprendre comment cela fonctionne.
Assurez-vous de modifier le ; il s'agit de l'adresse IP interne de la VM GCP ; le port client doit être le même que celui du serveur.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Ne lancez pas encore le serveur, créons d'abord le client.
Créez le client et prenez une minute pour examiner les commentaires. Je l'appellerai pair-client.py.
Assurez-vous de modifier le ; le port doit être le même que sur le serveur.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Nous aurons besoin de deux fenêtres de terminal pour exécuter l'exemple PAIR. Nous exécuterons le serveur sur une fenêtre et le client sur l'autre. Maintenant, exécutez-le comme suit.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Examinez la sortie, nous venons de créer une nouvelle prise PAIR.
Nous devrons effacer la connexion TCP avant de la réexécuter. Pour ce faire, utilisez la commande suivante.
$ python3 pair-server.py
? Remarques :
Nous pouvons exécuter une seule PAIR à la fois, cela signifie que nous ne pouvons pas avoir plusieurs clients, rappelez-vous qu'il s'agit d'une PAIR, le premier client verrouillera le socket .
Si nous exécutons le serveur une fois et le client deux fois, le deuxième client se « bloquera », cela signifie que le deuxième client attendra qu'un nouveau serveur se connecte.
- Les
Si nous voulons exécuter la paire plus d'une fois, nous devrons tuer le serveur et effacer la connexion TCP.
PAIRES sont idéales lorsqu'un client doit avoir un accès exclusif à un serveur.
Nous pouvons avoir plusieurs serveurs vers plusieurs clients sous forme de PAIRs, mais nous devrons utiliser des numéros de PORT différents pour les connexions.
Chaque phase est indépendante les unes des autres, alors arrêtez le serveur, effacez les ports TCP et passez à la phase suivante.
Créons une connexion client-serveur, où plusieurs clients se connecteront à un seul serveur. Il s'agit du modèle de messagerie le plus populaire.
$ python3 pair-client.py
Nous allons maintenant développer deux Clients qui seront identiques en termes de fonctionnalités.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
Créons une copie de ce client et modifions-la en conséquence. Exécutez la commande suivante pour créer une nouvelle copie.
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Modifiez ensuite le req-client2.py et remplacez le client 1 par le client 2.
Modifions les messages print et socket (lignes 8 et 9)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Pour exécuter cet exemple, nous aurons besoin de trois fenêtres de terminal, une pour le serveur et deux pour les clients. Exécutez ce qui suit dans le premier terminal.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Vérifiez le résultat des fenêtres, nous venons de créer deux clients parlant à un serveur. Vous pouvez avoir autant de clients que vous le souhaitez, vous devrez créer des clients, même avec des fonctionnalités différentes qui se connectent à un seul serveur.
? Remarques :
Client – Serveur est le modèle le plus largement utilisé, nous l'avons déjà utilisé en classe 1 lorsque nous avons installé et exécuté le serveur HTTP Apache.
Arrêtez le serveur et nettoyez le port TCP 5555
- Tuez le serveur :
bash
$ sudo fuser -k 5555/tcp
Le modèle de publication – abonnement est un moyen très courant de contrôler la diffusion de données vers de nombreux clients abonnés à un contexte, de manière à ce que les serveurs envoient des données à un ou plusieurs clients.
$ python3 pair-server.py
Créons d’abord un exemple simple.
$ python3 pair-client.py
Créons un nouveau fichier, appelons-le pub_server.py.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Créez un nouveau fichier pub_client.py.
* Le script accepte trois arguments de la ligne de commande (à savoir l'IP et les deux ports).
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Nous sommes prêts à lancer notre application pub-sub ! Nous aurons besoin de trois fenêtres de terminal. Lors de la première exécution du terminal :
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Exécutons le client pour se connecter et s'abonner aux données par code postal, par exemple 10001 (NYC). N'oubliez pas que le script client s'abonne aux deux instances de serveur. Exécutez la commande suivante :
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
Les sockets Push/Pull vous permettent de distribuer des messages à plusieurs travailleurs, disposés dans un pipeline. Ceci est très utile pour exécuter du code en parallèle. Un socket Push distribuera les messages à ses clients Pull de manière égale, et les clients enverront une réponse à un autre serveur, appelé collecteur.
Ceci est équivalent au modèle producteur/consommateur, mais les résultats calculés par le consommateur ne sont pas envoyés en amont mais en aval vers un autre socket pull/consommateur.
Nous implémenterons la fonctionnalité suivante.
Le producteur PUSH des nombres aléatoires de 0 à 10 aux consommateurs.
Deux instances du même consommateur extrairont les chiffres et effectueront une tâche lourde.
La tâche peut être n'importe quel calcul lourd, par exemple une multiplication matricielle.
Par souci de simplicité, notre « lourde tâche » renverra simplement le même numéro.
Les consommateurs POUSSERONT les résultats individuels (calculs de tâches lourdes) vers un Result Collector, qui regroupera les résultats.
Pour plus de simplicité, une instance du Result Collector EXTRAIT les résultats et calcule la somme partielle de chaque consommateur. On peut facilement additionner les deux sommes partielles si besoin.
Voyons un exemple simple.
Cet exemple démontre le potentiel du traitement distribué pour le traitement parallèle.
Tout d'abord, créons le producteur appelé producteur.py fonctionnant sur le port 5555, assurez-vous d'adapter votre .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Ensuite, créez le consumer.py comme suit. N'oubliez pas de changer les deux s dans le code.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Enfin, développons le collector.py, changeons encore le .
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Assurez-vous de ne pas avoir d'erreur d'indentation !
$ python3 pair-server.py
Tout d'abord, nous devons exécuter le collector.py, le collecteur attendra que les données soient collectées jusqu'à ce que nous démarrions le producteur.
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Bravo ! ? Vous avez utilisé ZeroMQ pour développer des modèles de messagerie !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!