搜索
首页后端开发Python教程使用 ZeroMQ 在分布式系统中发送消息

使用 ZeroMQ 在分布式系统中发送消息

让我们使用 Python 来开发不同的消息传递模式。

您需要观看以下视频才能按照分步命令进行操作。

慢慢来;确保在运行命令之前仔细检查它们。

  • 以下视频演示了本教程中使用的命令。

Messaging in distributed systems using ZeroMQ

我在我的 GCP 虚拟机上运行本教程,但也可以在本地运行它 ✅

本教程使用 ZeroMQ 介绍 Python3 中套接字的概念。 ZeroMQ 是一种开发套接字的简单方法,允许分布式进程通过发送消息相互通信。

  • 最简单的形式是,一个套接字(节点)“监听”特定的 IP 端口,同时另一个套接字伸出来形成连接。使用套接字,我们可以拥有一对一、一对多和多对多连接模式。

我们今天将研究的消息传递模式如下:

  • 配对: 排他性、一对一的通信,两个同伴相互通信。通信是双向的,套接字中没有存储特定的状态。服务器监听某个端口,客户端连接到该端口。

Messaging in distributed systems using ZeroMQ

  • 客户端 – 服务器:客户端连接到一台或多台服务器。该模式允许请求-响应模式。客户端发送请求“zmq.REQ”并接收回复。

Messaging in distributed systems using ZeroMQ

  • 发布/订阅: 一种传统的通信模式,消息的发送者(称为发布者)将消息发送到特定的接收者(称为订阅者)。消息的发布无需知道该知识的订阅者是什么或是否存在。多个订阅者订阅由发布者发布的消息/主题,或者一个订阅者可以连接到多个发布者。

Messaging in distributed systems using ZeroMQ

  • 推拉套接字(又名管道):让您将消息分发给排列在管道中的多个工作人员。 Push 套接字会将发送的消息均匀分发到其 Pull 客户端。这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。

Messaging in distributed systems using ZeroMQ

注意: 使用套接字可能会很棘手,使用相同的端口号/相同的套接字一次又一次运行相同的代码,可能会导致连接“挂起”(服务器看起来像是正在运行,但它不能接受连接)。发生这种情况是因为我们没有正确关闭和销毁之前的连接。

解决这个问题最合适的方法是关闭套接字并销毁 ZeroMQ 上下文。有关更多详细信息,请参阅第 2 阶段和第 3 阶段的 try – catch 块。

在本教程中,您可能会遇到此类问题,例如,在同一端口中多次运行同一服务器。如果您遇到挂起问题,建议您终止 Python 进程,清理 TCP 端口号,然后再次运行服务器(请参阅步骤 11)。

第 1 阶段:将服务器与客户端配对

让我们首先创建一个新的虚拟机,然后我们将安装Python3。

  • 保留虚拟机内部 IP 的副本,在本教程中我们将使用内部 IP 地址。
    1. 打开一个新的终端连接并运行以下命令(一个接一个)。最后一个命令安装 ZeroMQ。
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

出现提示时输入:Y。

如今许多应用程序都包含跨网络的组件,因此消息传递至关重要。今天我们将使用 TCP 进行消息传输。

您可以使用 VSC 访问您的虚拟机,也可以使用 SSH 运行命令并使用 pico 编辑文件,在我的例子中,我将使用 SSH。

?确保仔细复制代码。

我们需要创建第一个ZeroMQ 服务器,该服务器一次只允许与一个客户端绑定。

  • 创建一个名为pair-server.py的新文件,然后输入以下代码。

  • 代码使用 zmq.PAIR 模式创建一个新套接字,然后将服务器绑定到特定的 IP 端口(我们已经在 GCP 中打开)。请注意,在我们停止服务器之前,服务器不会停止运行。

  • 查看评论以了解其工作原理。

  • 确保更改 ;这是 GCP 虚拟机的 内部 IP 地址;客户端端口应与服务器端口相同。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
</internal_vm_address>

先不要运行服务器,首先让我们创建客户端。

创建客户端并花一点时间检查评论。我将其命名为pair-client.py。

确保更改 ;端口应与服务器中的端口相同。

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

我们需要两个个终端窗口来运行PAIR示例。我们将在一个窗口上运行服务器,在另一个窗口上运行客户端。现在,按如下方式运行它。

  • 运行服务器
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
</internal_vm_address>
  • 运行客户端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<internal_vm_address>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count



<p>检查输出,我们刚刚创建了一个新的 <strong>PAIR</strong> 套接字。</p>

<ul>
<li>当客户端完成连接时,脚本将终止。然后停止服务器(ctrl c)并杀死它。</li>
</ul>

<p>在再次运行之前,我们需要清除 TCP 连接。为此,请使用以下命令。<br>
</p>

<pre class="brush:php;toolbar:false">$ python3 pair-server.py

?备注:

  • 我们一次只能运行一个PAIR,这意味着我们不能有多个客户端,记住这是一个PAIR,第一个客户端将锁定套接字.

  • 如果我们运行服务器一次,客户端运行两次,第二个客户端将“挂起”,这意味着第二个客户端将等待新服务器连接。

  • 如果我们想要多次运行该对,我们需要终止服务器并清除 TCP 连接。

  • PAIR 当客户端需要独占访问服务器时是理想的选择。

  • 我们可以将多个服务器作为一对连接到多个客户端,但我们需要使用不同的端口号进行连接。

每个阶段都是相互独立的,因此,停止服务器,清除 TCP 端口,然后进入下一阶段。

第 2 阶段:将服务器与多个客户端配对

让我们创建一个客户端-服务器连接,其中多个客户端将连接到单个服务器。这是最流行的消息传递模式。

  • 让我们在 REP-REQ(回复请求)模式的上下文中创建一个服务器。
  • 我们将调用服务器rep-server.py,使用端口5555。
$ python3 pair-client.py

现在我们将开发两个功能相同的客户端。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<internal_vm_address>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
</internal_vm_address>

让我们创建该客户端的副本并进行相应的编辑。运行以下命令来制作新副本。

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>.
</internal_vm_address>

然后编辑req-client2.py并将客户端1更改为客户端2。

让我们编辑打印和套接字消息(第 8 行和第 9 行)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<internal_vm_address>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
</internal_vm_address>

要运行此示例,我们需要三个 个终端窗口,一个用于服务器,两个用于客户端。在第一个终端中运行以下命令。

  • 让我们启动服务器
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • 让我们启动第一个客户端
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
</internal_vm_address>
  • 让我们启动第二个客户端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<internal_vm_address>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count



<p>检查窗口的输出,我们刚刚创建了两个与一台服务器通信的客户端。您可以拥有任意数量的客户端,您将需要创建客户端,即使具有连接到一台服务器的不同功能。</p>

<blockquote>
<p>? <strong>备注:</strong></p>

<ul>
<li><p>客户端-服务器是最广泛使用的模式,当我们安装和运行 Apache HTTP 服务器时,我们已经在第 1 类中使用了它。</p></li>
<li>
<p><strong>停止服务器并清理 TCP 端口 5555</strong> </p>

<ul>
<li>杀死服务器:</li>
</ul>
</li>
</ul>

<p><br>
     重击<br>
    $ sudo fusion -k 5555/tcp <br>
<br>
</p>
</blockquote>

<h6>
  
  
  第 3 阶段:将服务器与客户端配对
</h6>

<p>发布-订阅模式是一种非常常见的方法,用于控制向订阅上下文的许多客户端广播数据,服务器将数据发送到一个或多个客户端。 </p>

<pre class="brush:php;toolbar:false">$ python3 pair-server.py

让我们首先创建一个简单的示例。

$ python3 pair-client.py

让我们创建一个新文件,命名为 pub_server.py。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<internal_vm_address>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
</internal_vm_address>
  • 该命令将指示 python 以特定的方式运行服务器
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>.
</internal_vm_address>

创建一个新文件 pub_client.py。
* 该脚本接受来自命令行的三个参数(即 IP 和两个端口)。

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<internal_vm_address>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
</internal_vm_address>

我们已准备好运行我们的pub-sub应用程序!我们需要三个个终端窗口。在第一个终端中运行:

$ cp req-client1.py req-client2.py
  • 在第二个终端中运行:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<internal_vm_address>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
</internal_vm_address>
  • 每个服务器都会生成天气数据。例如:
    • 邮政编码,例如:10001
    • 温带,例如:-68

让我们运行客户端以通过邮政编码连接并订阅数据,例如 10001 (NYC)。请记住,客户端脚本订阅了两个服务器实例。运行下一个命令:

$ python3 rep-server.py
  • 完成杀死服务器(ctrl z)并清除 TCP 端口后,运行以下命令:
$ python3 req-client1.py
$ python3 req-client2.py
第 4 阶段:推/拉:使用管道模式**

推/拉套接字可让您将消息分发给排列在管道中的多个工作人员。这对于并行运行代码非常有用。 Push 套接字会将消息均匀分发到其 Pull 客户端,客户端将响应发送到另一个称为收集器的服务器。

Messaging in distributed systems using ZeroMQ

  • 这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。

  • 我们将实现以下功能。

  • 生产者将向消费者推送 0 到 10 的随机数。

  • 同一消费者的两个实例将拉取数字并执行繁重的任务。

  • 任务可以是任何繁重的计算,例如矩阵乘法。

  • 为了简单起见,我们的“繁重任务”将只返回相同的数字。

  • 消费者会将各个结果(繁重的任务计算)推送到结果收集器,该收集器将汇总结果。

  • 为了简单起见,结果收集器的实例将拉取结果并计算每个消费者的部分总和。如果需要,我们可以轻松地将两个部分和相加。

  • 让我们看一个简单的例子。

    • 生产者生成 [1,2,3,4,5]。
    • 消费者1接收到[2,4],然后计算一个繁重的任务并将结果转发给结果收集器。
    • 消费者2收到[1,3,5],然后计算一个繁重的任务,并将结果转发给结果收集器。
    • 结果收集器计算计数和部分总和,例如:
    • Consumer1[2,4],这意味着从 Consumer1 收到 2 个数字,它们的总和为 6
    • Consumer2[1,3,5],表示从该 Consumer2 收到 3 个数字,其总和为 9
  • 此示例演示了分布式处理并行处理的潜力。

首先,让我们创建在端口 5555 上运行的名为 Producer.py 的生产者,确保您调整了您的 .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

然后创建consumer.py如下。不要忘记更改代码中的两个 s。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
</internal_vm_address>

最后,让我们开发collector.py,再次更改.

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<internal_vm_address>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count



<p>确保没有缩进错误!</p>

<pre class="brush:php;toolbar:false">$ python3 pair-server.py

首先,我们需要运行collector.py,收集器将等待数据被收集,直到我们启动生产者。

$ python3 pair-client.py
  • 然后,我们将一一启动消费者,在不同的终端窗口中运行每个命令。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • 在另一个终端中运行相同的命令。
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<internal_vm_address>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
</internal_vm_address>
  • 最后,我们将启动生产者,开始将数据发送到我们的管道。
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>.
</internal_vm_address>

干得好! ?您使用 ZeroMQ 来开发消息传递模式!

以上是使用 ZeroMQ 在分布式系统中发送消息的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
2小时的Python计划:一种现实的方法2小时的Python计划:一种现实的方法Apr 11, 2025 am 12:04 AM

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python:探索其主要应用程序Python:探索其主要应用程序Apr 10, 2025 am 09:41 AM

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。

您可以在2小时内学到多少python?您可以在2小时内学到多少python?Apr 09, 2025 pm 04:33 PM

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础?如何在10小时内通过项目和问题驱动的方式教计算机小白编程基础?Apr 02, 2025 am 07:18 AM

如何在10小时内教计算机小白编程基础?如果你只有10个小时来教计算机小白一些编程知识,你会选择教些什么�...

如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到?如何在使用 Fiddler Everywhere 进行中间人读取时避免被浏览器检测到?Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere进行中间人读取时如何避免被检测到当你使用FiddlerEverywhere...

Python 3.6加载Pickle文件报错"__builtin__"模块未找到怎么办?Python 3.6加载Pickle文件报错"__builtin__"模块未找到怎么办?Apr 02, 2025 am 07:12 AM

Python3.6环境下加载Pickle文件报错:ModuleNotFoundError:Nomodulenamed...

如何提高jieba分词在景区评论分析中的准确性?如何提高jieba分词在景区评论分析中的准确性?Apr 02, 2025 am 07:09 AM

如何解决jieba分词在景区评论分析中的问题?当我们在进行景区评论分析时,往往会使用jieba分词工具来处理文�...

如何使用正则表达式匹配到第一个闭合标签就停止?如何使用正则表达式匹配到第一个闭合标签就停止?Apr 02, 2025 am 07:06 AM

如何使用正则表达式匹配到第一个闭合标签就停止?在处理HTML或其他标记语言时,常常需要使用正则表达式来�...

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

WebStorm Mac版

WebStorm Mac版

好用的JavaScript开发工具