ホームページ >バックエンド開発 >Python チュートリアル >ZeroMQ を使用した分散システムでのメッセージング

ZeroMQ を使用した分散システムでのメッセージング

Barbara Streisand
Barbara Streisandオリジナル
2024-11-21 07:33:11498ブラウズ

ZeroMQ を使用した分散システムでのメッセージング

Python を使用して、さまざまなメッセージング パターンを開発してみましょう。

ステップバイステップのコマンドに従うには、次のビデオを視聴する必要があります。

時間をかけてください。コマンドを実行する前に必ず再確認してください。

  • 次のビデオは、このチュートリアルで使用されるコマンドを示しています。

Messaging in distributed systems using ZeroMQ

このチュートリアルは GCP VM で実行していますが、ローカルで自由に実行できます ✅

このチュートリアルでは、ZeroMQ を使用した Python3 のソケットの概念を紹介します。 ZeroMQ は、分散プロセスがメッセージを送信して相互に通信できるようにするソケットを開発する簡単な方法です。

  • 最も単純な形式では、ソケット (ノード) が特定の IP ポートを「リッスン」し、同時に別のソケットが接続を形成します。ソケットを使用すると、1 対 1、1 対多、多対多の接続パターンが可能になります。

今日検討するメッセージングのパターンは次のとおりです:

  • ペア: 2 つのピアが相互に通信する、排他的な 1 対 1 通信。通信は双方向であり、ソケットには特定の状態は保存されません。サーバーは特定のポートをリッスンし、クライアントはそれに接続します。

Messaging in distributed systems using ZeroMQ

  • クライアント – サーバー: クライアントは 1 つ以上のサーバーに接続します。このパターンでは REQUEST – RESPONSE モードが可能になります。クライアントはリクエスト「zmq.REQ」を送信し、応答を受け取ります。

Messaging in distributed systems using ZeroMQ

  • パブリッシュ/サブスクライブ: パブリッシャーと呼ばれるメッセージの送信者が、サブスクライバーと呼ばれる特定の受信者にメッセージを送信する従来の通信パターン。メッセージは、その知識の購読者が何であるか、または存在するかどうかを知ることなく公開されます。複数のサブスクライバーがパブリッシャーによって公開されているメッセージ/トピックをサブスクライブするか、1 人のサブスクライバーが複数のパブリッシャーに接続できます。

Messaging in distributed systems using ZeroMQ

  • プッシュ ソケットとプル ソケット (別名パイプライン): パイプラインに配置された複数のワーカーにメッセージを配布できます。プッシュ ソケットは、送信されたメッセージをプル クライアントに均等に配布します。これはプロデューサー/コンシューマー モデルと同等ですが、コンシューマーによって計算された結果は上流ではなく、下流の別のプル/コンシューマー ソケットに送信されます。

Messaging in distributed systems using ZeroMQ

? 注: ソケットの操作は難しい場合があり、同じポート番号/同じソケットを使用して同じコードを何度も実行すると、接続が「ハング」する可能性があります (サーバーは実行されているように見えますが、ただし、接続は受け入れられません)。これは、以前の接続を正しく閉じて破棄しなかったために発生します。

これに対処する最も適切な方法は、ソケットを閉じて ZeroMQ コンテキストを破棄することです。詳細については、フェーズ 2 およびフェーズ 3 の try – catch ブロックを参照してください。

このチュートリアルでは、同じポートで同じサーバーを複数回実行するなどの問題が発生する可能性があります。ハングの問題が発生した場合は、Python プロセスを強制終了し、TCP ポート番号をクリーンアップして、サーバーをもう一度実行することをお勧めします (ステップ 11 を参照)。

フェーズ 1: サーバーとクライアントのペアリング

まず新しい VM を作成してから、Python3 をインストールします。

  • VM の内部 IP のコピーを保存します。このチュートリアルでは内部 IP アドレスを使用します。
    1. 新しいターミナル接続を開き、次のコマンドを (次々に) 実行します。最後のコマンドは ZeroMQ をインストールします。
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

プロンプトが表示されたら、「Y」と入力します。

最近のアプリケーションの多くは、ネットワーク全体に広がるコンポーネントで構成されているため、メッセージングが不可欠です。今日はメッセージ転送に TCP を使用します。

VSC を使用して VM にアクセスすることも、SSH を使用してコマンドを実行し、pico でファイルを編集することもできます。私の場合は SSH を使用します。

?コードを慎重にコピーしてください。

最初の ZeroMQ サーバー を作成する必要があります。サーバーは一度に 1 つのクライアントのみとのバインドを許可します。

  • pair-server.py という新しいファイルを作成し、次のコードを入力します。

  • このコードは、zmq.PAIR パターンを使用して新しいソケットを作成し、サーバーを特定の IP ポート (GCP ですでに開いている) にバインドします。サーバーは、停止するまで実行を停止しないことに注意してください。

  • これがどのように機能するかを理解するには、コメントを見てください。

  • 必ず ; を変更してください。それは GCP VM の内部 IP アドレスです。クライアントのポートはサーバーと同じである必要があります。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

まだサーバーを実行しないでください。まずクライアントを作成しましょう。

クライアントを作成し、コメントを確認してください。これを、pair-client.py と呼びます。

必ず ; を変更してください。ポートはサーバーのものと同じである必要があります。

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

PAIR サンプルを実行するには、2 つの ターミナル ウィンドウが必要です。 1 つのウィンドウでサーバーを実行し、もう 1 つのウィンドウでクライアントを実行します。ここで、次のように実行します。

  • サーバーを実行する
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • クライアントを実行する
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

出力を確認してください。新しい PAIR ソケットが作成されました。

  • クライアントが接続を完了すると、スクリプトは終了します。次に、サーバーを停止し (ctrl c)、強制終了します。

TCP 接続を再度実行する前に、TCP 接続をクリアする必要があります。これを行うには、次のコマンドを使用します。

$ python3 pair-server.py

?注:

  • 実行できるのは 一度に 1 つのペアのみです。これは、複数のクライアントを持つことができないことを意味します。これは ペア であることを覚えておいてください。最初のクライアントがソケットをロックします.

  • サーバーを 1 回実行し、クライアントを 2 回実行すると、2 番目のクライアントが「ハング」します。これは、2 番目のクライアントが新しいサーバーの接続を待機することを意味します。

  • ペアを複数回実行したい場合は、サーバーを強制終了し、TCP 接続をクリアする必要があります。

  • PAIR は、クライアントがサーバーに排他的にアクセスする必要がある場合に最適です。

  • 複数のサーバーと複数のクライアントをペアとして使用できますが、接続には異なるポート番号を使用する必要があります。

各フェーズは互いに独立しているため、サーバーを停止し、TCP ポートをクリアして、次のフェーズに進みます。

フェーズ 2: サーバーを複数のクライアントとペアリングする

複数のクライアントが単一のサーバーに接続するクライアント/サーバー接続を作成しましょう。これは最も一般的なメッセージ パターンです。

  • REP-REQ (リクエストへの応答) パターンのコンテキストでサーバーを作成してみましょう。
  • ポート 5555 を使用して、サーバーを rep-server.py と呼びます。
$ python3 pair-client.py

ここで、機能の点で同一の 2 つのクライアントを開発します。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()

このクライアントのコピーを作成し、それに応じて編集してみましょう。次のコマンドを実行して新しいコピーを作成します。

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

次に、req-client2.py を編集し、クライアント 1 をクライアント 2 に変更します。

印刷メッセージとソケット メッセージを編集しましょう (8 行目と 9 行目)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

この例を実行するには、3 つの ターミナル ウィンドウ (サーバー用に 1 つとクライアント用に 2 つ) が必要です。最初のターミナルで次を実行します。

  • サーバーを起動しましょう
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • 最初のクライアントを開始しましょう
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • 2 番目のクライアントを開始しましょう
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

ウィンドウの出力を確認してください。1 つのサーバーと通信する 2 つのクライアントを作成したところです。クライアントは必要な数だけ持つことができますが、1 つのサーバーに接続するさまざまな機能を備えている場合でも、クライアントを作成する必要があります。

? メモ:

  • クライアント – サーバーは最も広く使用されているパターンであり、Apache HTTP サーバーをインストールして実行したときにクラス 1 ですでに使用していました。

  • サーバーを停止し、TCP ポート 5555 をクリーンアップします

    • サーバーを強制終了します:


バッシュ
$ sudo fuser -k 5555/tcp

フェーズ 3: サーバーとクライアントのペアリング

パブリッシュ – サブスクライブ パターンは、サーバーが 1 つ以上のクライアントにデータを送信する方法で、コンテキストにサブスクライブしている多くのクライアントへのデータのブロードキャストを制御する非常に一般的な方法です。

$ python3 pair-server.py

まず簡単な例を作成しましょう。

$ python3 pair-client.py

pub_server.py という名前の新しいファイルを作成しましょう。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • このコマンドは、Python に特定のサーバーを実行するように指示します。
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

新しいファイル pub_client.py を作成します。
* スクリプトはコマンド ラインから 3 つの引数 (IP と 2 つのポート) を受け取ります。

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

pub-sub アプリケーションを実行する準備ができました。 3 つ のターミナル ウィンドウが必要になります。最初のターミナルで次を実行します:

$ cp req-client1.py req-client2.py
  • 2 番目のターミナルで次を実行します。
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
  • 各サーバーは気象データを生成します。例えば:
    • 郵便番号、例: 10001
    • 温帯、例: -68

クライアントを実行して、郵便番号 (例: 10001 (NYC)) でデータに接続し、サブスクライブしてみましょう。クライアント スクリプトが両方のサーバー インスタンスにサブスクライブしていることに注意してください。次のコマンドを実行します:

$ python3 rep-server.py
  • サーバーの強制終了 (ctrl z) が完了したら、次のコマンドを実行して TCP ポートをクリアします。
$ python3 req-client1.py
$ python3 req-client2.py
フェーズ 4: プッシュ/プル: パイプライン パターンの使用**

プッシュ/プル ソケットを使用すると、パイプラインに配置された複数のワーカーにメッセージを配布できます。これは、コードを並列実行する場合に非常に便利です。プッシュ ソケットはメッセージをそのプル クライアントに均等に分散し、クライアントはコレクターと呼ばれる別のサーバーに応答を送信します。

Messaging in distributed systems using ZeroMQ

  • これはプロデューサー/コンシューマー モデルと同等ですが、コンシューマーによって計算された結果は上流ではなく、下流の別のプル/コンシューマー ソケットに送信されます。

  • 以下の機能を実装します。

  • プロデューサーは 0 から 10 までの乱数をコンシューマーに PUSH します。

  • 同じコンシューマの 2 つのインスタンスが数値を PULL し、重いタスクを実行します。

  • タスクは、行列の乗算などの重い計算を行うことができます。

  • 簡単にするために、「重いタスク」は同じ数値を返すだけです。

  • コンシューマーは、個々の結果 (重いタスクの計算) を 結果コレクター に PUSH し、結果を集約します。

  • 簡単にするために、Result Collector のインスタンスは結果を PULL し、各コンシューマの部分和を計算します。必要に応じて、2 つの部分和を簡単に合計できます。

  • 簡単な例を見てみましょう。

    • プロデューサーは [1,2,3,4,5] を生成します。
    • コンシューマー 1 は [2,4] を受信し、重いタスクを計算して結果を結果コレクターに転送します。
    • コンシューマー 2 は [1,3,5] を受信し、重いタスクを計算して結果を結果コレクターに転送します。
    • 結果コレクターはカウントと部分和を計算します。例:
    • Consumer1[2,4]、これは Consumer1 から受信した 2 個の数値を意味し、その合計は 6 です。
    • Consumer2[1,3,5]、つまり、この Consumer2 から受信した 3 個の数値と、それらの合計が 9 であることを意味します。
  • この例は、並列処理における分散処理の可能性を示しています。

まず、ポート 5555 で実行される生産者.py という名前のプロデューサーを作成しましょう。

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

次に、consumer.py を次のように作成します。コード内の 2 つの を忘れずに変更してください。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

最後に、collecter.py を開発しましょう。

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

インデントに誤りがないか確認してください。

$ python3 pair-server.py

まず、collector.py を実行する必要があります。コレクタは、プロデューサーを開始するまでデータが収集されるのを待ちます。

$ python3 pair-client.py
  • 次に、コンシューマーを 1 つずつ起動し、別のターミナル ウィンドウで各コマンドを実行します。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • 別の端末で同じコマンドを実行します。
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • 最後に、パイプラインへのデータの送信を開始するプロデューサーを起動します。
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

よくやった! ? ZeroMQ を使用してメッセージング パターンを開発しました!

以上がZeroMQ を使用した分散システムでのメッセージングの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。