首頁 >後端開發 >Python教學 >使用 ZeroMQ 在分散式系統中傳送訊息

使用 ZeroMQ 在分散式系統中傳送訊息

Barbara Streisand
Barbara Streisand原創
2024-11-21 07:33:11481瀏覽

使用 ZeroMQ 在分散式系統中傳送訊息

讓我們使用 Python 來發展不同的訊息傳遞模式。

您需要觀看以下影片才能按照逐步命令進行操作。

慢慢來;確保在運行命令之前仔細檢查它們。

  • 以下影片示範了本教學中使用的指令。

Messaging in distributed systems using ZeroMQ

我在我的 GCP 虛擬機上運行本教程,但也可以在本地運行它 ✅

本教學使用 ZeroMQ 介紹 Python3 中套接字的概念。 ZeroMQ 是一種開發套接字的簡單方法,允許分散式進程透過發送訊息相互通訊。

  • 最簡單的形式是,一個套接字(節點)「監聽」特定的 IP 端口,同時另一個套接字伸出來形成連接。使用套接字,我們可以擁有一對一、一對多和多對多連線模式。

我們今天將研究的訊息傳遞模式如下:

  • 配對: 排他的、一對一的通信,兩個同伴相互通信。通訊是雙向的,套接字中沒有儲存特定的狀態。伺服器監聽某個端口,客戶端連接到該端口。

Messaging in distributed systems using ZeroMQ

  • 客戶端 – 伺服器:客戶端連接到一台或多台伺服器。此模式允許請求-回應模式。客戶端發送請求“zmq.REQ”並接收回應。

Messaging in distributed systems using ZeroMQ

  • 發布/訂閱: 一種傳統的通訊模式,訊息的發送者(稱為發布者)將訊息傳送到特定的接收者(稱為訂閱者)。訊息的發布無需知道該知識的訂閱者是什麼或是否存在。多個訂閱者訂閱由發布者發布的消息/主題,或者一個訂閱者可以連接到多個發布者。

Messaging in distributed systems using ZeroMQ

  • 推拉套接字(又稱管道):讓您將訊息分發給排列在管道中的多個工作人員。 Push 套接字會將發送的訊息均勻分發到其 Pull 用戶端。這相當於生產者/消費者模型,但是消費者計算的結果不會發送到上游,而是下游到另一個拉取/消費者套接字。

Messaging in distributed systems using ZeroMQ

注意: 使用套接字可能會很棘手,使用相同的連接埠號碼/相同的套接字一次又一次運行相同的程式碼,可能會導致連接「掛起」(伺服器看起來像是正在運行,但它不能接受連接)。發生這種情況是因為我們沒有正確關閉和銷毀之前的連接。

解決這個問題最合適的方法是關閉套接字並銷毀 ZeroMQ 上下文。有關更多詳細信息,請參閱第 2 階段和第 3 階段的 try – catch 區塊。

在本教學中,您可能會遇到此類問題,例如,在同一連接埠中多次執行相同伺服器。如果您遇到掛起問題,建議您終止 Python 進程,清理 TCP 連接埠號碼,然後再次執行伺服器(請參閱步驟 11)。

第 1 階段:將伺服器與客戶端配對

讓我們先建立一個新的虛擬機,然後我們將安裝Python3。

  • 保留虛擬機器內部 IP 的副本,在本教學中我們將使用內部 IP 位址。
    1. 開啟一個新的終端連接並執行以下命令(一個接一個)。最後一個指令安裝 ZeroMQ。
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

出現提示時輸入:Y。

如今許多應用程式都包含跨網路的元件,因此訊息傳遞至關重要。今天我們將使用 TCP 進行訊息傳輸。

您可以使用 VSC 存取您的虛擬機,也可以使用 SSH 執行命令並使用 pico 編輯文件,在我的例子中,我將使用 SSH。

?確保仔細複製代碼。

我們需要建立第一個ZeroMQ 伺服器,該伺服器一次只允許與一個客戶端綁定。

  • 建立一個名為pair-server.py的新文件,然後輸入以下程式碼。

  • 程式碼使用 zmq.PAIR 模式建立一個新套接字,然後將伺服器綁定到特定的 IP 連接埠(我們已經在 GCP 中開啟)。請注意,在我們停止伺服器之前,伺服器不會停止運作。

  • 查看評論以了解其工作原理。

  • 確保更改 ;這是 GCP 虛擬機器的 內部 IP 位址;用戶端連接埠應與伺服器連接埠相同。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

先不要運行伺服器,首先讓我們建立客戶端。

建立客戶端並花一點時間檢查評論。我將其命名為pair-client.py。

確保更改 ;連接埠應與伺服器中的連接埠相同。

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

我們需要兩個個終端視窗來運作PAIR範例。我們將在一個視窗上運行伺服器,在另一個視窗上運行客戶端。現在,按如下方式運行它。

  • 運行伺服器
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • 運行客戶端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

檢查輸出,我們剛剛建立了一個新的 PAIR 套接字。

  • 當客戶端完成連線時,腳本將終止。然後停止伺服器(ctrl c)並殺死它。

在再次運行之前,我們需要清除 TCP 連線。為此,請使用以下命令。

$ python3 pair-server.py

?備註:

  • 我們一次只能運行一個PAIR,這意味著我們不能有多個客戶端,記住這是一個PAIR,第一個客戶端將鎖定套接字.

  • 如果我們運行伺服器一次,客戶端運行兩次,第二個客戶端將“掛起”,這意味著第二個客戶端將等待新伺服器連線。

  • 如果我們想要多次運行該對,我們需要終止伺服器並清除 TCP 連線。

  • PAIR 當客戶端需要獨佔存取伺服器時是理想的選擇。

  • 我們可以將多個伺服器作為一對連接到多個客戶端,但我們需要使用不同的連接埠號碼進行連接。

每個階段都是相互獨立的,因此,停止伺服器,清除 TCP 端口,然後進入下一階段。

第 2 階段:將伺服器與多個客戶端配對

讓我們建立一個客戶端-伺服器連接,其中多個客戶端將連接到單一伺服器。這是最受歡迎的訊息傳遞模式。

  • 讓我們在 REP-REQ(回覆請求)模式的上下文中建立一個伺服器。
  • 我們將呼叫伺服器rep-server.py,使用連接埠5555。
$ python3 pair-client.py

現在我們將開發兩個功能相同的客戶端。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()

讓我們建立該客戶端的副本並進行對應的編輯。執行以下命令來製作新副本。

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

然後編輯req-client2.py並將客戶端1改為客戶端2。

讓我們編輯列印和套接字訊息(第 8 行和第 9 行)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

要執行此範例,我們需要三個 個終端窗口,一個用於伺服器,兩個用於客戶端。在第一個終端機中執行以下命令。

  • 讓我們啟動伺服器
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • 讓我們啟動第一個客戶端
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • 讓我們啟動第二個客戶端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

檢查視窗的輸出,我們剛剛建立了兩個與一台伺服器通訊的客戶端。您可以擁有任意數量的客戶端,您將需要建立客戶端,即使具有連接到一台伺服器的不同功能。

備註:

  • 客戶端-伺服器是最廣泛使用的模式,當我們安裝和執行 Apache HTTP 伺服器時,我們已經在第 1 類中使用了它。

  • 停止伺服器並清理 TCP 連接埠 5555

    • 殺死伺服器:


重擊
$ sudo fusion -k 5555/tcp

第 3 階段:將伺服器與客戶端配對

發布-訂閱模式是一種非常常見的方法,用於控制向訂閱上下文的許多客戶端廣播數據,伺服器將數據發送到一個或多個客戶端。

$ python3 pair-server.py

讓我們先建立一個簡單的範例。

$ python3 pair-client.py

讓我們建立一個新文件,命名為 pub_server.py。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • 該命令將指示 python 以特定的方式運行伺服器
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

建立一個新檔案 pub_client.py。
* 此腳本接受來自命令列的三個參數(即 IP 和兩個連接埠)。

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

我們已準備好運行我們的pub-sub應用程式!我們需要三個個終端視窗。在第一個終端機中運作:

$ cp req-client1.py req-client2.py
  • 在第二個終端機中運作:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
  • 每個伺服器都會產生天氣資料。例如:
    • 郵遞區號,例如:10001
    • 溫帶,例如:-68

讓我們運行客戶端以透過郵遞區號連接並訂閱數據,例如 10001 (NYC)。請記住,客戶端腳本訂閱了兩個伺服器實例。執行下一個指令:

$ python3 rep-server.py
  • 完成殺死伺服器(ctrl z)並清除 TCP 連接埠後,執行以下命令:
$ python3 req-client1.py
$ python3 req-client2.py
第 4 階段:推/拉:使用管道模式**

推/拉套接字可讓您將訊息分發給排列在管道中的多個工作人員。這對於並行運行程式碼非常有用。 Push 套接字會將訊息均勻分發到其 Pull 用戶端,客戶端將回應傳送到另一個稱為收集器的伺服器。

Messaging in distributed systems using ZeroMQ

  • 這相當於生產者/消費者模型,但是消費者計算的結果不會發送到上游,而是下游到另一個拉取/消費者套接字。

  • 我們將實現以下功能。

  • 生產者將向消費者推送 0 到 10 的隨機數。

  • 同一消費者的兩個實例將拉取數字並執行繁重的任務。

  • 任務可以是任何繁重的計算,例如矩陣乘法。

  • 為了簡單起見,我們的「繁重任務」將只傳回相同的數字。

  • 消費者會將各個結果(繁重的任務計算)推送到結果收集器,該收集器將匯總結果。

  • 為了簡單起見,結果收集器的實例將拉取結果併計算每個消費者的部分總和。如果需要,我們可以輕鬆地將兩個部分和加起來。

  • 讓我們來看一個簡單的例子。

    • 生產者生成 [1,2,3,4,5]。
    • 消費者1接收到[2,4],然後計算一個繁重的任務並將結果轉發給結果收集器。
    • 消費者2收到[1,3,5],然後計算一個繁重的任務,並將結果轉發給結果收集器。
    • 結果收集器計算計數和部分總和,例如:
    • Consumer1[2,4],這表示從 Consumer1 收到 2 個數字,它們的總和為 6
    • Consumer2[1,3,5],表示從該 Consumer2 收到 3 個數字,其總和為 9
  • 此範例示範了分散式處理並行處理的潛力。

首先,讓我們建立在連接埠 5555 上運行的名為 Producer.py 的生產者,確保您調整了您的 .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

然後建立consumer.py如下。不要忘記更改程式碼中的兩個 s。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

最後,讓我們開發collector.py,再改.

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

確保沒有縮排錯誤!

$ python3 pair-server.py

首先,我們需要運行collector.py,收集器將等待資料被收集,直到我們啟動生產者。

$ python3 pair-client.py
  • 然後,我們將一一啟動消費者,在不同的終端視窗中執行每個命令。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • 在另一個終端機中執行相同的命令。
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • 最後,我們將啟動生產者,開始將資料傳送到我們的管道。
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

幹得好! ?您使用 ZeroMQ 來開發訊息傳遞模式!

以上是使用 ZeroMQ 在分散式系統中傳送訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn