使用 ZeroMQ 在分布式系统中发送消息
让我们使用 Python 来开发不同的消息传递模式。
您需要观看以下视频才能按照分步命令进行操作。
慢慢来;确保在运行命令之前仔细检查它们。
- 以下视频演示了本教程中使用的命令。
我在我的 GCP 虚拟机上运行本教程,但也可以在本地运行它 ✅
本教程使用 ZeroMQ 介绍 Python3 中套接字的概念。 ZeroMQ 是一种开发套接字的简单方法,允许分布式进程通过发送消息相互通信。
- 最简单的形式是,一个套接字(节点)“监听”特定的 IP 端口,同时另一个套接字伸出来形成连接。使用套接字,我们可以拥有一对一、一对多和多对多连接模式。
我们今天将研究的消息传递模式如下:
- 配对: 排他性、一对一的通信,两个同伴相互通信。通信是双向的,套接字中没有存储特定的状态。服务器监听某个端口,客户端连接到该端口。
- 客户端 – 服务器:客户端连接到一台或多台服务器。该模式允许请求-响应模式。客户端发送请求“zmq.REQ”并接收回复。
- 发布/订阅: 一种传统的通信模式,消息的发送者(称为发布者)将消息发送到特定的接收者(称为订阅者)。消息的发布无需知道该知识的订阅者是什么或是否存在。多个订阅者订阅由发布者发布的消息/主题,或者一个订阅者可以连接到多个发布者。
- 推拉套接字(又名管道):让您将消息分发给排列在管道中的多个工作人员。 Push 套接字会将发送的消息均匀分发到其 Pull 客户端。这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。
? 注意: 使用套接字可能会很棘手,使用相同的端口号/相同的套接字一次又一次运行相同的代码,可能会导致连接“挂起”(服务器看起来像是正在运行,但它不能接受连接)。发生这种情况是因为我们没有正确关闭和销毁之前的连接。
解决这个问题最合适的方法是关闭套接字并销毁 ZeroMQ 上下文。有关更多详细信息,请参阅第 2 阶段和第 3 阶段的 try – catch 块。
在本教程中,您可能会遇到此类问题,例如,在同一端口中多次运行同一服务器。如果您遇到挂起问题,建议您终止 Python 进程,清理 TCP 端口号,然后再次运行服务器(请参阅步骤 11)。
第 1 阶段:将服务器与客户端配对
让我们首先创建一个新的虚拟机,然后我们将安装Python3。
- 保留虚拟机内部 IP 的副本,在本教程中我们将使用内部 IP 地址。
- 打开一个新的终端连接并运行以下命令(一个接一个)。最后一个命令安装 ZeroMQ。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
出现提示时输入:Y。
如今许多应用程序都包含跨网络的组件,因此消息传递至关重要。今天我们将使用 TCP 进行消息传输。
您可以使用 VSC 访问您的虚拟机,也可以使用 SSH 运行命令并使用 pico 编辑文件,在我的例子中,我将使用 SSH。
?确保仔细复制代码。
我们需要创建第一个ZeroMQ 服务器,该服务器一次只允许与一个客户端绑定。
创建一个名为pair-server.py的新文件,然后输入以下代码。
代码使用 zmq.PAIR 模式创建一个新套接字,然后将服务器绑定到特定的 IP 端口(我们已经在 GCP 中打开)。请注意,在我们停止服务器之前,服务器不会停止运行。
查看评论以了解其工作原理。
确保更改 ;这是 GCP 虚拟机的 内部 IP 地址;客户端端口应与服务器端口相同。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1) </internal_vm_address>
先不要运行服务器,首先让我们创建客户端。
创建客户端并花一点时间检查评论。我将其命名为pair-client.py。
确保更改 ;端口应与服务器中的端口相同。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
我们需要两个个终端窗口来运行PAIR示例。我们将在一个窗口上运行服务器,在另一个窗口上运行客户端。现在,按如下方式运行它。
- 运行服务器
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1) </internal_vm_address>
- 运行客户端
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<internal_vm_address>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count <p>检查输出,我们刚刚创建了一个新的 <strong>PAIR</strong> 套接字。</p> <ul> <li>当客户端完成连接时,脚本将终止。然后停止服务器(ctrl c)并杀死它。</li> </ul> <p>在再次运行之前,我们需要清除 TCP 连接。为此,请使用以下命令。<br> </p> <pre class="brush:php;toolbar:false">$ python3 pair-server.py
?备注:
我们一次只能运行一个PAIR,这意味着我们不能有多个客户端,记住这是一个PAIR,第一个客户端将锁定套接字.
如果我们运行服务器一次,客户端运行两次,第二个客户端将“挂起”,这意味着第二个客户端将等待新服务器连接。
如果我们想要多次运行该对,我们需要终止服务器并清除 TCP 连接。
PAIR 当客户端需要独占访问服务器时是理想的选择。
我们可以将多个服务器作为一对连接到多个客户端,但我们需要使用不同的端口号进行连接。
每个阶段都是相互独立的,因此,停止服务器,清除 TCP 端口,然后进入下一阶段。
第 2 阶段:将服务器与多个客户端配对
让我们创建一个客户端-服务器连接,其中多个客户端将连接到单个服务器。这是最流行的消息传递模式。
- 让我们在 REP-REQ(回复请求)模式的上下文中创建一个服务器。
- 我们将调用服务器rep-server.py,使用端口5555。
$ python3 pair-client.py
现在我们将开发两个功能相同的客户端。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<internal_vm_address>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close() </internal_vm_address>
让我们创建该客户端的副本并进行相应的编辑。运行以下命令来制作新副本。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>. </internal_vm_address>
然后编辑req-client2.py并将客户端1更改为客户端2。
让我们编辑打印和套接字消息(第 8 行和第 9 行)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<internal_vm_address>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy() </internal_vm_address>
要运行此示例,我们需要三个 个终端窗口,一个用于服务器,两个用于客户端。在第一个终端中运行以下命令。
- 让我们启动服务器
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
- 让我们启动第一个客户端
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1) </internal_vm_address>
- 让我们启动第二个客户端
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<internal_vm_address>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count <p>检查窗口的输出,我们刚刚创建了两个与一台服务器通信的客户端。您可以拥有任意数量的客户端,您将需要创建客户端,即使具有连接到一台服务器的不同功能。</p> <blockquote> <p>? <strong>备注:</strong></p> <ul> <li><p>客户端-服务器是最广泛使用的模式,当我们安装和运行 Apache HTTP 服务器时,我们已经在第 1 类中使用了它。</p></li> <li> <p><strong>停止服务器并清理 TCP 端口 5555</strong> </p> <ul> <li>杀死服务器:</li> </ul> </li> </ul> <p><br> 重击<br> $ sudo fusion -k 5555/tcp <br> <br> </p> </blockquote> <h6> 第 3 阶段:将服务器与客户端配对 </h6> <p>发布-订阅模式是一种非常常见的方法,用于控制向订阅上下文的许多客户端广播数据,服务器将数据发送到一个或多个客户端。 </p> <pre class="brush:php;toolbar:false">$ python3 pair-server.py
让我们首先创建一个简单的示例。
$ python3 pair-client.py
让我们创建一个新文件,命名为 pub_server.py。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<internal_vm_address>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close() </internal_vm_address>
- 该命令将指示 python 以特定的方式运行服务器
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>. </internal_vm_address>
创建一个新文件 pub_client.py。
* 该脚本接受来自命令行的三个参数(即 IP 和两个端口)。
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<internal_vm_address>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy() </internal_vm_address>
我们已准备好运行我们的pub-sub应用程序!我们需要三个个终端窗口。在第一个终端中运行:
$ cp req-client1.py req-client2.py
- 在第二个终端中运行:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<internal_vm_address>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy() </internal_vm_address>
- 每个服务器都会生成天气数据。例如:
- 邮政编码,例如:10001
- 温带,例如:-68
让我们运行客户端以通过邮政编码连接并订阅数据,例如 10001 (NYC)。请记住,客户端脚本订阅了两个服务器实例。运行下一个命令:
$ python3 rep-server.py
- 完成杀死服务器(ctrl z)并清除 TCP 端口后,运行以下命令:
$ python3 req-client1.py
$ python3 req-client2.py
第 4 阶段:推/拉:使用管道模式**
推/拉套接字可让您将消息分发给排列在管道中的多个工作人员。这对于并行运行代码非常有用。 Push 套接字会将消息均匀分发到其 Pull 客户端,客户端将响应发送到另一个称为收集器的服务器。
这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。
我们将实现以下功能。
生产者将向消费者推送 0 到 10 的随机数。
同一消费者的两个实例将拉取数字并执行繁重的任务。
任务可以是任何繁重的计算,例如矩阵乘法。
为了简单起见,我们的“繁重任务”将只返回相同的数字。
消费者会将各个结果(繁重的任务计算)推送到结果收集器,该收集器将汇总结果。
为了简单起见,结果收集器的实例将拉取结果并计算每个消费者的部分总和。如果需要,我们可以轻松地将两个部分和相加。
-
让我们看一个简单的例子。
- 生产者生成 [1,2,3,4,5]。
- 消费者1接收到[2,4],然后计算一个繁重的任务并将结果转发给结果收集器。
- 消费者2收到[1,3,5],然后计算一个繁重的任务,并将结果转发给结果收集器。
- 结果收集器计算计数和部分总和,例如:
- Consumer1[2,4],这意味着从 Consumer1 收到 2 个数字,它们的总和为 6。
- Consumer2[1,3,5],表示从该 Consumer2 收到 3 个数字,其总和为 9。
此示例演示了分布式处理并行处理的潜力。
首先,让我们创建在端口 5555 上运行的名为 Producer.py 的生产者,确保您调整了您的 .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
然后创建consumer.py如下。不要忘记更改代码中的两个 s。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<internal_vm_address>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1) </internal_vm_address>
最后,让我们开发collector.py,再次更改.
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<internal_vm_address>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count <p>确保没有缩进错误!</p> <pre class="brush:php;toolbar:false">$ python3 pair-server.py
首先,我们需要运行collector.py,收集器将等待数据被收集,直到我们启动生产者。
$ python3 pair-client.py
- 然后,我们将一一启动消费者,在不同的终端窗口中运行每个命令。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
- 在另一个终端中运行相同的命令。
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<internal_vm_address>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close() </internal_vm_address>
- 最后,我们将启动生产者,开始将数据发送到我们的管道。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <internal_vm_address>. </internal_vm_address>
干得好! ?您使用 ZeroMQ 来开发消息传递模式!
以上是使用 ZeroMQ 在分布式系统中发送消息的详细内容。更多信息请关注PHP中文网其他相关文章!

pythonuseshybridapprace,ComminingCompilationTobyTecoDeAndInterpretation.1)codeiscompiledtoplatform-Indepententbybytecode.2)bytecodeisisterpretedbybythepbybythepythonvirtualmachine,增强效率和通用性。

theKeyDifferencesBetnewpython's“ for”和“ for”和“ loopsare:1)” for“ loopsareIdealForiteringSequenceSquencesSorkNowniterations,而2)”,而“ loopsareBetterforConterContinuingUntilacTientInditionIntionismetismetistismetistwithOutpredefinedInedIterations.un

在Python中,可以通过多种方法连接列表并管理重复元素:1)使用 运算符或extend()方法可以保留所有重复元素;2)转换为集合再转回列表可以去除所有重复元素,但会丢失原有顺序;3)使用循环或列表推导式结合集合可以去除重复元素并保持原有顺序。

fasteStmethodMethodMethodConcatenationInpythondependersonListsize:1)forsmalllists,operatorseffited.2)forlargerlists,list.extend.extend()orlistComprechensionfaster,withextendEffaster,withExtendEffers,withextend()withextend()是extextend()asmoremory-ememory-emmoremory-emmoremory-emmodifyinginglistsin-place-place-place。

toInSerteLementIntoApythonList,useAppend()toaddtotheend,insert()foreSpificPosition,andextend()formultiplelements.1)useappend()foraddingsingleitemstotheend.2)useAddingsingLeitemStotheend.2)useeapecificindex,toadapecificindex,toadaSpecificIndex,toadaSpecificIndex,blyit'ssssssslorist.3 toaddextext.3

pythonlistsareimplementedasdynamicarrays,notlinkedlists.1)他们areStoredIncoNtiguulMemoryBlocks,mayrequireRealLealLocationWhenAppendingItems,EmpactingPerformance.2)LinkesedlistSwoldOfferefeRefeRefeRefeRefficeInsertions/DeletionsButslowerIndexeDexedAccess,Lestpypytypypytypypytypy

pythonoffersFourmainMethodStoreMoveElement Fromalist:1)删除(值)emovesthefirstoccurrenceofavalue,2)pop(index)emovesanderturnsanelementataSpecifiedIndex,3)delstatementremoveselemsbybybyselementbybyindexorslicebybyindexorslice,and 4)

toresolvea“ dermissionded”错误Whenrunningascript,跟随台词:1)CheckAndAdjustTheScript'Spermissions ofchmod xmyscript.shtomakeitexecutable.2)nesureThEseRethEserethescriptistriptocriptibationalocatiforecationAdirectorywherewhereyOuhaveWritePerMissionsyOuhaveWritePermissionsyYouHaveWritePermissions,susteSyAsyOURHomeRecretectory。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

MinGW - 适用于 Windows 的极简 GNU
这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

SublimeText3汉化版
中文版,非常好用

Dreamweaver Mac版
视觉化网页开发工具

禅工作室 13.0.1
功能强大的PHP集成开发环境