Maison >développement back-end >Tutoriel Python >Messagerie dans les systèmes distribués utilisant ZeroMQ

Messagerie dans les systèmes distribués utilisant ZeroMQ

Barbara Streisand
Barbara Streisandoriginal
2024-11-21 07:33:11458parcourir

Messagerie dans les systèmes distribués utilisant ZeroMQ

Utilisons Python pour développer les différents modèles de messagerie.

Vous devrez regarder la vidéo suivante pour suivre les commandes étape par étape.

Prenez votre temps ; assurez-vous de revérifier les commandes avant de les exécuter.

  • La vidéo suivante montre les commandes utilisées dans ce tutoriel.

Messaging in distributed systems using ZeroMQ

J'exécute ce tutoriel sur ma VM GCP, mais n'hésitez pas à l'exécuter localement ✅

Ce tutoriel présente les concepts de sockets en Python3 à l'aide de ZeroMQ. ZeroMQ est un moyen simple de développer des sockets pour permettre aux processus distribués de communiquer entre eux en envoyant des messages.

  • Dans sa forme la plus simple, un socket (nœud) « écoute » sur un port IP spécifique, tandis qu'un autre socket tend la main pour établir une connexion. En utilisant les sockets, nous pouvons avoir des modèles de connexion un à un, un à plusieurs et plusieurs à plusieurs.

Les modèles de messagerie que nous examinerons aujourd'hui sont les suivants :

  • Paire : Exclusif, communication individuelle, où deux pairs communiquent entre eux. La communication est bidirectionnelle et aucun état spécifique n'est stocké dans le socket. Le serveur écoute sur un certain port et le client s'y connecte.

Messaging in distributed systems using ZeroMQ

  • Client – ​​Serveur : Un client se connecte à un ou plusieurs serveurs. Ce modèle permet le mode REQUETE – RÉPONSE. Un client envoie une requête « zmq.REQ » et reçoit une réponse.

Messaging in distributed systems using ZeroMQ

  • Publier/S'abonner : Un modèle de communication traditionnel dans lequel les expéditeurs de messages, appelés éditeurs, envoient des messages à des destinataires spécifiques, appelés abonnés. Les messages sont publiés sans que l'on sache quoi ou s'il existe un abonné à cette connaissance. Plusieurs abonnés s'abonnent aux messages/sujets publiés par un éditeur ou un abonné peut se connecter à plusieurs éditeurs.

Messaging in distributed systems using ZeroMQ

  • Sockets Push et Pull (alias Pipelines) : Vous permettent de distribuer des messages à plusieurs travailleurs, disposés dans un pipeline. Un socket Push distribuera uniformément les messages envoyés à ses clients Pull. Ceci est équivalent au modèle producteur/consommateur, mais les résultats calculés par le consommateur ne sont pas envoyés en amont mais en aval vers un autre socket pull/consommateur.

Messaging in distributed systems using ZeroMQ

? Remarque : Travailler avec des Sockets peut être délicat, exécuter encore et encore le même code, en utilisant le même numéro de port/même socket, pourrait conduire à une connexion qui « se bloque » (le serveur semble être en cours d'exécution, mais il ne peut pas accepter les connexions). Cela se produit parce que nous n’avons pas fermé et détruit correctement les connexions précédentes.

Le moyen le plus approprié de résoudre ce problème est de fermer le socket et de détruire le contexte ZeroMQ. Reportez-vous aux blocs try – catch de la phase 2 et de la phase 3 pour plus de détails.

Dans ce didacticiel, vous pourriez rencontrer de tels problèmes, par exemple en exécutant plusieurs fois le même serveur sur le même port. Si vous rencontrez des problèmes de blocage, il est conseillé d'arrêter le processus Python, de nettoyer le numéro de port TCP et de réexécuter le serveur (voir étape 11).

Phase 1 : Appairage d'un serveur à un client

Commençons par créer une nouvelle VM, puis nous installerons Python3.

  • Conservez une copie de l'adresse IP interne de la VM, pour ce tutoriel, nous utiliserons l'adresse IP interne.
    1. Ouvrez une nouvelle connexion de terminal et exécutez les commandes suivantes (l'une après l'autre). La dernière commande installe ZeroMQ.
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Tapez : Y lorsque vous y êtes invité.

De nos jours, de nombreuses applications sont constituées de composants qui s'étendent sur plusieurs réseaux, la messagerie est donc essentielle. Aujourd'hui, nous utiliserons TCP pour le transfert de messages.

Vous pouvez accéder à votre VM en utilisant VSC, ou vous pouvez exécuter les commandes en utilisant SSH et éditer des fichiers avec pico, dans mon cas j'utiliserai SSH.

? Assurez-vous de copier soigneusement le code.

Nous devrons créer notre premier serveur ZeroMQ, le serveur permettra la liaison avec un seul client à la fois.

  • Créez un nouveau fichier appelé pair-server.py, puis entrez le code suivant.

  • Le code crée un nouveau socket en utilisant le modèle zmq.PAIR, puis lie le serveur à un port IP particulier (que nous avons déjà ouvert dans GCP). Notez que le serveur ne cessera pas de fonctionner tant que nous ne l'arrêterons pas.

  • Jetez un œil aux commentaires pour comprendre comment cela fonctionne.

  • Assurez-vous de modifier le ; il s'agit de l'adresse IP interne de la VM GCP ; le port client doit être le même que celui du serveur.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

Ne lancez pas encore le serveur, créons d'abord le client.

Créez le client et prenez une minute pour examiner les commentaires. Je l'appellerai pair-client.py.

Assurez-vous de modifier le ; le port doit être le même que sur le serveur.

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Nous aurons besoin de deux fenêtres de terminal pour exécuter l'exemple PAIR. Nous exécuterons le serveur sur une fenêtre et le client sur l'autre. Maintenant, exécutez-le comme suit.

  • Exécutez le serveur
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • Exécuter le client
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Examinez la sortie, nous venons de créer une nouvelle prise PAIR.

  • Le script se terminera lorsque le client aura terminé sa connexion. Arrêtez ensuite le serveur (ctrl c) et tuez-le.

Nous devrons effacer la connexion TCP avant de la réexécuter. Pour ce faire, utilisez la commande suivante.

$ python3 pair-server.py

? Remarques :

  • Nous pouvons exécuter une seule PAIR à la fois, cela signifie que nous ne pouvons pas avoir plusieurs clients, rappelez-vous qu'il s'agit d'une PAIR, le premier client verrouillera le socket .

  • Si nous exécutons le serveur une fois et le client deux fois, le deuxième client se « bloquera », cela signifie que le deuxième client attendra qu'un nouveau serveur se connecte.

  • Si nous voulons exécuter la paire plus d'une fois, nous devrons tuer le serveur et effacer la connexion TCP.

  • Les
  • PAIRES sont idéales lorsqu'un client doit avoir un accès exclusif à un serveur.

  • Nous pouvons avoir plusieurs serveurs vers plusieurs clients sous forme de PAIRs, mais nous devrons utiliser des numéros de PORT différents pour les connexions.

Chaque phase est indépendante les unes des autres, alors arrêtez le serveur, effacez les ports TCP et passez à la phase suivante.

Phase 2 : Associer un serveur à plusieurs clients

Créons une connexion client-serveur, où plusieurs clients se connecteront à un seul serveur. Il s'agit du modèle de messagerie le plus populaire.

  • Créons un serveur dans le contexte du modèle REP-REQ (réponse à une requête).
  • Nous appellerons le serveur rep-server.py, en utilisant le port 5555.
$ python3 pair-client.py

Nous allons maintenant développer deux Clients qui seront identiques en termes de fonctionnalités.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()

Créons une copie de ce client et modifions-la en conséquence. Exécutez la commande suivante pour créer une nouvelle copie.

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Modifiez ensuite le req-client2.py et remplacez le client 1 par le client 2.

Modifions les messages print et socket (lignes 8 et 9)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

Pour exécuter cet exemple, nous aurons besoin de trois fenêtres de terminal, une pour le serveur et deux pour les clients. Exécutez ce qui suit dans le premier terminal.

  • Démarrons le serveur
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • Commençons le premier client
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • Commençons le deuxième client
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Vérifiez le résultat des fenêtres, nous venons de créer deux clients parlant à un serveur. Vous pouvez avoir autant de clients que vous le souhaitez, vous devrez créer des clients, même avec des fonctionnalités différentes qui se connectent à un seul serveur.

? Remarques :

  • Client – ​​Serveur est le modèle le plus largement utilisé, nous l'avons déjà utilisé en classe 1 lorsque nous avons installé et exécuté le serveur HTTP Apache.

  • Arrêtez le serveur et nettoyez le port TCP 5555

    • Tuez le serveur :


bash
$ sudo fuser -k 5555/tcp

Phase 3 : Appairage d'un serveur à un client

Le modèle de publication – abonnement est un moyen très courant de contrôler la diffusion de données vers de nombreux clients abonnés à un contexte, de manière à ce que les serveurs envoient des données à un ou plusieurs clients.

$ python3 pair-server.py

Créons d’abord un exemple simple.

$ python3 pair-client.py

Créons un nouveau fichier, appelons-le pub_server.py.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • Cette commande demandera à Python d'exécuter un serveur de manière spécifique et
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Créez un nouveau fichier pub_client.py.
* Le script accepte trois arguments de la ligne de commande (à savoir l'IP et les deux ports).

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

Nous sommes prêts à lancer notre application pub-sub ! Nous aurons besoin de trois fenêtres de terminal. Lors de la première exécution du terminal :

$ cp req-client1.py req-client2.py
  • Dans la deuxième exécution du terminal :
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
  • Chaque serveur génère des données météorologiques. Par exemple:
    • Le code postal, ex : 10001
    • Les tempérés, ex : -68

Exécutons le client pour se connecter et s'abonner aux données par code postal, par exemple 10001 (NYC). N'oubliez pas que le script client s'abonne aux deux instances de serveur. Exécutez la commande suivante :

$ python3 rep-server.py
  • Lorsque vous avez fini, tuez les serveurs (ctrl z) et effacez les ports TCP en exécutant les commandes suivantes :
$ python3 req-client1.py
$ python3 req-client2.py
Phase 4 : Push/Pull : Utilisation d'un modèle de pipeline**

Les sockets Push/Pull vous permettent de distribuer des messages à plusieurs travailleurs, disposés dans un pipeline. Ceci est très utile pour exécuter du code en parallèle. Un socket Push distribuera les messages à ses clients Pull de manière égale, et les clients enverront une réponse à un autre serveur, appelé collecteur.

Messaging in distributed systems using ZeroMQ

  • Ceci est équivalent au modèle producteur/consommateur, mais les résultats calculés par le consommateur ne sont pas envoyés en amont mais en aval vers un autre socket pull/consommateur.

  • Nous implémenterons la fonctionnalité suivante.

  • Le producteur PUSH des nombres aléatoires de 0 à 10 aux consommateurs.

  • Deux instances du même consommateur extrairont les chiffres et effectueront une tâche lourde.

  • La tâche peut être n'importe quel calcul lourd, par exemple une multiplication matricielle.

  • Par souci de simplicité, notre « lourde tâche » renverra simplement le même numéro.

  • Les consommateurs POUSSERONT les résultats individuels (calculs de tâches lourdes) vers un Result Collector, qui regroupera les résultats.

  • Pour plus de simplicité, une instance du Result Collector EXTRAIT les résultats et calcule la somme partielle de chaque consommateur. On peut facilement additionner les deux sommes partielles si besoin.

  • Voyons un exemple simple.

    • Le producteur génère [1,2,3,4,5].
    • Le consommateur 1 reçoit [2,4], puis calcule une tâche lourde et transmet les résultats au collecteur de résultats.
    • Le consommateur 2 reçoit [1,3,5], puis calcule une tâche lourde et transmet les résultats au collecteur de résultats.
    • Le collecteur de résultats calcule les décomptes et les sommes partielles, par exemple :
    • Consumer1[2,4], cela signifie 2 nombres reçus du Consommateur1 et leur somme est 6.
    • Consommateur2[1,3,5], cela signifie 3 nombres reçus de ce Consommateur2 et leur somme est 9.
  • Cet exemple démontre le potentiel du traitement distribué pour le traitement parallèle.

Tout d'abord, créons le producteur appelé producteur.py fonctionnant sur le port 5555, assurez-vous d'adapter votre .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Ensuite, créez le consumer.py comme suit. N'oubliez pas de changer les deux s dans le code.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

Enfin, développons le collector.py, changeons encore le .

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Assurez-vous de ne pas avoir d'erreur d'indentation !

$ python3 pair-server.py

Tout d'abord, nous devons exécuter le collector.py, le collecteur attendra que les données soient collectées jusqu'à ce que nous démarrions le producteur.

$ python3 pair-client.py
  • Ensuite, nous démarrerons les consommateurs un par un, exécuterons chaque commande dans une fenêtre de terminal différente.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • Exécutez la même commande dans un autre terminal.
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • Enfin, nous démarrerons notre producteur qui commencera à envoyer des données à notre pipeline.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Bravo ! ? Vous avez utilisé ZeroMQ pour développer des modèles de messagerie !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn