ホームページ >バックエンド開発 >Python チュートリアル >ZeroMQ を使用した分散システムでのメッセージング
Python を使用して、さまざまなメッセージング パターンを開発してみましょう。
ステップバイステップのコマンドに従うには、次のビデオを視聴する必要があります。
時間をかけてください。コマンドを実行する前に必ず再確認してください。
このチュートリアルは GCP VM で実行していますが、ローカルで自由に実行できます ✅
このチュートリアルでは、ZeroMQ を使用した Python3 のソケットの概念を紹介します。 ZeroMQ は、分散プロセスがメッセージを送信して相互に通信できるようにするソケットを開発する簡単な方法です。
今日検討するメッセージングのパターンは次のとおりです:
? 注: ソケットの操作は難しい場合があり、同じポート番号/同じソケットを使用して同じコードを何度も実行すると、接続が「ハング」する可能性があります (サーバーは実行されているように見えますが、ただし、接続は受け入れられません)。これは、以前の接続を正しく閉じて破棄しなかったために発生します。
これに対処する最も適切な方法は、ソケットを閉じて ZeroMQ コンテキストを破棄することです。詳細については、フェーズ 2 およびフェーズ 3 の try – catch ブロックを参照してください。
このチュートリアルでは、同じポートで同じサーバーを複数回実行するなどの問題が発生する可能性があります。ハングの問題が発生した場合は、Python プロセスを強制終了し、TCP ポート番号をクリーンアップして、サーバーをもう一度実行することをお勧めします (ステップ 11 を参照)。
まず新しい VM を作成してから、Python3 をインストールします。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
プロンプトが表示されたら、「Y」と入力します。
最近のアプリケーションの多くは、ネットワーク全体に広がるコンポーネントで構成されているため、メッセージングが不可欠です。今日はメッセージ転送に TCP を使用します。
VSC を使用して VM にアクセスすることも、SSH を使用してコマンドを実行し、pico でファイルを編集することもできます。私の場合は SSH を使用します。
?コードを慎重にコピーしてください。
最初の ZeroMQ サーバー を作成する必要があります。サーバーは一度に 1 つのクライアントのみとのバインドを許可します。
pair-server.py という新しいファイルを作成し、次のコードを入力します。
このコードは、zmq.PAIR パターンを使用して新しいソケットを作成し、サーバーを特定の IP ポート (GCP ですでに開いている) にバインドします。サーバーは、停止するまで実行を停止しないことに注意してください。
これがどのように機能するかを理解するには、コメントを見てください。
必ず ; を変更してください。それは GCP VM の内部 IP アドレスです。クライアントのポートはサーバーと同じである必要があります。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
まだサーバーを実行しないでください。まずクライアントを作成しましょう。
クライアントを作成し、コメントを確認してください。これを、pair-client.py と呼びます。
必ず ; を変更してください。ポートはサーバーのものと同じである必要があります。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
PAIR サンプルを実行するには、2 つの ターミナル ウィンドウが必要です。 1 つのウィンドウでサーバーを実行し、もう 1 つのウィンドウでクライアントを実行します。ここで、次のように実行します。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
出力を確認してください。新しい PAIR ソケットが作成されました。
TCP 接続を再度実行する前に、TCP 接続をクリアする必要があります。これを行うには、次のコマンドを使用します。
$ python3 pair-server.py
?注:
実行できるのは 一度に 1 つのペアのみです。これは、複数のクライアントを持つことができないことを意味します。これは ペア であることを覚えておいてください。最初のクライアントがソケットをロックします.
サーバーを 1 回実行し、クライアントを 2 回実行すると、2 番目のクライアントが「ハング」します。これは、2 番目のクライアントが新しいサーバーの接続を待機することを意味します。
ペアを複数回実行したい場合は、サーバーを強制終了し、TCP 接続をクリアする必要があります。
PAIR は、クライアントがサーバーに排他的にアクセスする必要がある場合に最適です。
複数のサーバーと複数のクライアントをペアとして使用できますが、接続には異なるポート番号を使用する必要があります。
各フェーズは互いに独立しているため、サーバーを停止し、TCP ポートをクリアして、次のフェーズに進みます。
複数のクライアントが単一のサーバーに接続するクライアント/サーバー接続を作成しましょう。これは最も一般的なメッセージ パターンです。
$ python3 pair-client.py
ここで、機能の点で同一の 2 つのクライアントを開発します。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
このクライアントのコピーを作成し、それに応じて編集してみましょう。次のコマンドを実行して新しいコピーを作成します。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
次に、req-client2.py を編集し、クライアント 1 をクライアント 2 に変更します。
印刷メッセージとソケット メッセージを編集しましょう (8 行目と 9 行目)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
この例を実行するには、3 つの ターミナル ウィンドウ (サーバー用に 1 つとクライアント用に 2 つ) が必要です。最初のターミナルで次を実行します。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
ウィンドウの出力を確認してください。1 つのサーバーと通信する 2 つのクライアントを作成したところです。クライアントは必要な数だけ持つことができますが、1 つのサーバーに接続するさまざまな機能を備えている場合でも、クライアントを作成する必要があります。
? メモ:
クライアント – サーバーは最も広く使用されているパターンであり、Apache HTTP サーバーをインストールして実行したときにクラス 1 ですでに使用していました。
サーバーを停止し、TCP ポート 5555 をクリーンアップします
- サーバーを強制終了します:
バッシュ
$ sudo fuser -k 5555/tcp
パブリッシュ – サブスクライブ パターンは、サーバーが 1 つ以上のクライアントにデータを送信する方法で、コンテキストにサブスクライブしている多くのクライアントへのデータのブロードキャストを制御する非常に一般的な方法です。
$ python3 pair-server.py
まず簡単な例を作成しましょう。
$ python3 pair-client.py
pub_server.py という名前の新しいファイルを作成しましょう。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
新しいファイル pub_client.py を作成します。
* スクリプトはコマンド ラインから 3 つの引数 (IP と 2 つのポート) を受け取ります。
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
pub-sub アプリケーションを実行する準備ができました。 3 つ のターミナル ウィンドウが必要になります。最初のターミナルで次を実行します:
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
クライアントを実行して、郵便番号 (例: 10001 (NYC)) でデータに接続し、サブスクライブしてみましょう。クライアント スクリプトが両方のサーバー インスタンスにサブスクライブしていることに注意してください。次のコマンドを実行します:
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
プッシュ/プル ソケットを使用すると、パイプラインに配置された複数のワーカーにメッセージを配布できます。これは、コードを並列実行する場合に非常に便利です。プッシュ ソケットはメッセージをそのプル クライアントに均等に分散し、クライアントはコレクターと呼ばれる別のサーバーに応答を送信します。
これはプロデューサー/コンシューマー モデルと同等ですが、コンシューマーによって計算された結果は上流ではなく、下流の別のプル/コンシューマー ソケットに送信されます。
以下の機能を実装します。
プロデューサーは 0 から 10 までの乱数をコンシューマーに PUSH します。
同じコンシューマの 2 つのインスタンスが数値を PULL し、重いタスクを実行します。
タスクは、行列の乗算などの重い計算を行うことができます。
簡単にするために、「重いタスク」は同じ数値を返すだけです。
コンシューマーは、個々の結果 (重いタスクの計算) を 結果コレクター に PUSH し、結果を集約します。
簡単にするために、Result Collector のインスタンスは結果を PULL し、各コンシューマの部分和を計算します。必要に応じて、2 つの部分和を簡単に合計できます。
簡単な例を見てみましょう。
この例は、並列処理における分散処理の可能性を示しています。
まず、ポート 5555 で実行される生産者.py という名前のプロデューサーを作成しましょう。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
次に、consumer.py を次のように作成します。コード内の 2 つの を忘れずに変更してください。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
最後に、collecter.py を開発しましょう。
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
インデントに誤りがないか確認してください。
$ python3 pair-server.py
まず、collector.py を実行する必要があります。コレクタは、プロデューサーを開始するまでデータが収集されるのを待ちます。
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
よくやった! ? ZeroMQ を使用してメッセージング パターンを開発しました!
以上がZeroMQ を使用した分散システムでのメッセージングの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。