>백엔드 개발 >파이썬 튜토리얼 >ZeroMQ를 사용한 분산 시스템에서의 메시징

ZeroMQ를 사용한 분산 시스템에서의 메시징

Barbara Streisand
Barbara Streisand원래의
2024-11-21 07:33:11500검색

ZeroMQ를 사용한 분산 시스템에서의 메시징

Python을 사용하여 다양한 메시징 패턴을 개발해 보겠습니다.

단계별 명령을 따르려면 다음 동영상을 시청해야 합니다.

시간을 가지세요. 명령을 실행하기 전에 다시 확인하세요.

  • 다음 비디오는 이 튜토리얼에서 사용되는 명령을 보여줍니다.

Messaging in distributed systems using ZeroMQ

이 튜토리얼을 GCP VM에서 실행하지만 로컬에서도 자유롭게 실행할 수 있습니다. ✅

이 튜토리얼에서는 ZeroMQ를 사용하여 Python3의 소켓 개념을 소개합니다. ZeroMQ는 분산 프로세스가 메시지를 보내 서로 통신할 수 있도록 소켓을 개발하는 쉬운 방법입니다.

  • 가장 간단한 형태로, 소켓(노드)은 특정 IP 포트를 "수신"하고 다른 소켓은 연결을 형성합니다. 소켓을 사용하면 일대일, 일대다, 다대다 연결 패턴을 가질 수 있습니다.

오늘 살펴볼 메시지 패턴은 다음과 같습니다.

  • 페어: 두 피어가 서로 통신하는 독점적인 일대일 통신입니다. 통신은 양방향이며 소켓에 저장된 특정 상태가 없습니다. 서버는 특정 포트를 수신하고 클라이언트는 해당 포트에 연결합니다.

Messaging in distributed systems using ZeroMQ

  • 클라이언트 – 서버: 클라이언트는 하나 이상의 서버에 연결됩니다. 이 패턴은 REQUEST – RESPONSE 모드를 허용합니다. 클라이언트는 "zmq.REQ" 요청을 보내고 응답을 받습니다.

Messaging in distributed systems using ZeroMQ

  • 게시/구독: 게시자라고 하는 메시지 발신자가 구독자라고 하는 특정 수신자에게 메시지를 보내는 전통적인 통신 패턴입니다. 메시지는 해당 지식의 구독자가 존재하는지, 존재하는지 여부에 대한 지식 없이 게시됩니다. 여러 구독자가 게시자가 게시하는 메시지/주제를 구독하거나 한 명의 구독자가 여러 게시자에 연결할 수 있습니다.

Messaging in distributed systems using ZeroMQ

  • 푸시 앤 풀 소켓(일명 파이프라인): 메시지를 파이프라인에 배열하여 여러 작업자에게 배포할 수 있습니다. 푸시 소켓은 보낸 메시지를 풀 클라이언트에 균등하게 배포합니다. 이는 생산자/소비자 모델과 동일하지만 소비자가 계산한 결과는 업스트림으로 전송되지 않고 다른 풀/소비자 소켓으로 다운스트림으로 전송됩니다.

Messaging in distributed systems using ZeroMQ

? 참고: 소켓 작업은 까다로울 수 있습니다. 동일한 포트 번호/동일 소켓을 사용하여 동일한 코드를 계속해서 실행하면 연결이 "중단"될 수 있습니다(서버가 실행 중인 것처럼 보이지만, 그러나 연결은 허용되지 않습니다.) 이는 이전 연결을 올바르게 닫고 파괴하지 않았기 때문에 발생합니다.

이 문제를 해결하는 가장 적절한 방법은 소켓을 닫고 ZeroMQ 컨텍스트를 파괴하는 것입니다. 자세한 내용은 2단계 및 3단계의 try - catch 블록을 참조하세요.

이 튜토리얼에서는 동일한 포트에서 동일한 서버를 여러 번 실행하는 등의 문제가 발생할 수 있습니다. 정지 문제가 발생하는 경우 Python 프로세스를 종료하고 TCP 포트 번호를 정리한 후 서버를 한 번 더 실행하는 것이 좋습니다(11단계 참조).

1단계: 서버를 클라이언트에 페어링

새 VM을 만드는 것부터 시작하고 Python3을 설치하겠습니다.

  • VM 내부 IP의 복사본을 보관하세요. 이 튜토리얼에서는 내부 IP 주소를 사용합니다.
    1. 새 터미널 연결을 열고 다음 명령을 차례로 실행합니다. 마지막 명령은 ZeroMQ를 설치합니다.
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

메시지가 나타나면 Y를 입력하세요.

요즘 많은 애플리케이션이 네트워크 전체에 걸쳐 있는 구성 요소로 구성되어 있으므로 메시징이 필수적입니다. 오늘은 메시지 전송에 TCP를 사용해보겠습니다.

VSC를 사용하여 VM에 액세스하거나 SSH를 사용하여 명령을 실행하고 pico로 파일을 편집할 수 있습니다. 제 경우에는 SSH를 사용하겠습니다.

? 코드를 주의해서 복사하세요.

첫 번째 ZeroMQ 서버를 만들어야 합니다. 서버는 한 번에 하나의 클라이언트와의 바인딩을 허용합니다.

  • pair-server.py라는 새 파일을 만들고 다음 코드를 입력하세요.

  • 이 코드는 zmq.PAIR 패턴을 사용하여 새 소켓을 생성한 다음 서버를 특정 IP 포트(이미 GCP에서 열었음)에 바인딩합니다. 서버를 중지할 때까지 서버 실행이 중지되지 않습니다.

  • 이것이 어떻게 작동하는지 이해하려면 댓글을 살펴보십시오.

  • ; 이는 GCP VM의 내부 IP 주소입니다. 클라이언트 포트는 서버와 동일해야 합니다.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

아직 서버를 실행하지 말고 먼저 클라이언트를 만들어 보겠습니다.

클라이언트를 생성하고 잠시 시간을 내어 댓글을 검토해 보세요. pair-client.py라고 부르겠습니다.

; 포트는 서버와 동일해야 합니다.

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

PAIR 예제를 실행하려면 두 개의 터미널 창이 필요합니다. 한 창에서는 서버를 실행하고 다른 창에서는 클라이언트를 실행하겠습니다. 이제 다음과 같이 실행해 보세요.

  • 서버 실행
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • 클라이언트 실행
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

출력을 살펴보세요. 방금 새 PAIR 소켓을 만들었습니다.

  • 클라이언트가 연결을 완료하면 스크립트가 종료됩니다. 그런 다음 서버를 중지(ctrl c)하고 종료합니다.

다시 실행하기 전에 TCP 연결을 지워야 합니다. 이렇게 하려면 다음 명령을 사용하세요.

$ python3 pair-server.py

? 참고:

  • 한 번에 하나의 PAIR만 실행할 수 있습니다. 이는 여러 클라이언트를 가질 수 없다는 것을 의미합니다. 이것이 PAIR임을 기억하세요. 첫 번째 클라이언트가 소켓을 잠급니다. .

  • 서버를 한 번 실행하고 클라이언트를 두 번 실행하면 두 번째 클라이언트가 "중단"됩니다. 이는 두 번째 클라이언트가 새 서버가 연결될 때까지 기다린다는 의미입니다.

  • 쌍을 두 번 이상 실행하려면 서버를 종료하고 TCP 연결을 지워야 합니다.

  • PAIR는 클라이언트가 서버에 독점적으로 액세스해야 하는 경우에 이상적입니다.

  • 여러 서버와 여러 클라이언트를 PAIR로 구성할 수 있지만 연결에는 서로 다른 포트 번호를 사용해야 합니다.

각 단계는 서로 독립적이므로 서버를 중지하고 TCP 포트를 Clear한 후 다음 단계로 넘어갑니다.

2단계: 서버를 여러 클라이언트에 페어링

여러 클라이언트가 단일 서버에 연결되는 클라이언트-서버 연결을 만들어 보겠습니다. 이는 가장 인기 있는 메시지 패턴입니다.

  • REP-REQ(요청에 응답) 패턴으로 서버를 만들어 보겠습니다.
  • 포트 5555를 사용하여 서버 rep-server.py를 호출합니다.
$ python3 pair-client.py

이제 기능면에서 동일한 두 개의 클라이언트를 개발하겠습니다.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()

이 클라이언트의 복사본을 만들고 그에 따라 편집해 보겠습니다. 새 복사본을 만들려면 다음 명령을 실행하세요.

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

그런 다음 req-client2.py를 편집하고 클라이언트 1을 클라이언트 2로 변경합니다.

인쇄 및 소켓 메시지를 편집해 보겠습니다(8행과 9행)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

이 예제를 실행하려면 3개 터미널 창이 필요합니다. 하나는 서버용이고 두 개는 클라이언트용입니다. 첫 번째 터미널에서 다음을 실행하세요.

  • 서버를 시작해보자
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • 첫 번째 클라이언트를 시작해 보겠습니다.
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • 두 번째 클라이언트를 시작해 보겠습니다.
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

창 출력을 확인하세요. 방금 하나의 서버와 통신하는 두 개의 클라이언트를 만들었습니다. 원하는 만큼 클라이언트를 가질 수 있으며, 하나의 서버에 연결되는 다양한 기능을 사용하더라도 클라이언트를 만들어야 합니다.

? 참고:

  • 클라이언트 – 서버는 가장 널리 사용되는 패턴으로, 이미 Apache HTTP 서버를 설치하고 실행할 때 클래스 1에서 사용했습니다.

  • 서버를 중지하고 TCP 포트 5555를 정리하세요

    • 서버 종료:


배쉬
$ sudo 퓨저 -k 5555/tcp

3단계: 서버를 클라이언트에 페어링

게시-구독 패턴은 서버가 하나 이상의 클라이언트에 데이터를 보내는 방식으로 컨텍스트를 구독하는 많은 클라이언트에 대한 데이터 브로드캐스팅을 제어하는 ​​매우 일반적인 방법입니다.

$ python3 pair-server.py

먼저 간단한 예시를 만들어 보겠습니다.

$ python3 pair-client.py

새 파일을 만들어서 이름을 pub_server.py로 지정하겠습니다.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • 이 명령은 Python에게 특정 위치에서 서버를 실행하도록 지시합니다.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

새 파일 pub_client.py를 만듭니다.
* 스크립트는 명령줄에서 3개의 인수(IP와 2개의 포트)를 허용합니다.

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

pub-sub 애플리케이션을 실행할 준비가 되었습니다! 터미널 창이 세 개 필요합니다. 첫 번째 터미널에서 다음을 실행합니다:

$ cp req-client1.py req-client2.py
  • 두 번째 터미널에서 다음을 실행합니다.
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
  • 각 서버는 날씨 데이터를 생성합니다. 예를 들어:
    • 우편번호(예: 10001)
    • 온대, 예: -68

우편번호(예: 10001(NYC))로 데이터를 연결하고 구독하도록 클라이언트를 실행해 보겠습니다. 클라이언트 스크립트가 두 서버 인스턴스를 모두 구독한다는 것을 기억하세요. 다음 명령을 실행하세요:

$ python3 rep-server.py
  • 완료되면 서버를 종료하고(ctrl z) 다음 명령을 실행하여 TCP 포트를 지웁니다.
$ python3 req-client1.py
$ python3 req-client2.py
4단계: 푸시/풀: 파이프라인 패턴 사용**

푸시/풀 소켓을 사용하면 파이프라인에 배열된 여러 작업자에게 메시지를 배포할 수 있습니다. 이는 코드를 병렬로 실행하는 데 매우 유용합니다. 푸시 소켓은 풀 클라이언트에 메시지를 고르게 배포하고 클라이언트는 수집기라고 하는 다른 서버에 응답을 보냅니다.

Messaging in distributed systems using ZeroMQ

  • 이것은 생산자/소비자 모델과 동일하지만 소비자가 계산한 결과는 업스트림으로 전송되지 않고 다운스트림으로 다른 풀/소비자 소켓으로 전송됩니다.

  • 다음 기능을 구현하겠습니다.

  • 생산자는 0부터 10까지의 임의의 숫자를 소비자에게 푸시합니다.

  • 동일한 소비자의 두 인스턴스가 숫자를 가져오고 무거운 작업을 수행합니다.

  • 이 작업은 행렬 곱셈과 같은 무거운 계산일 수 있습니다.

  • 단순화를 위해 "과중한 작업"은 동일한 숫자만 반환합니다.

  • 소비자는 개별 결과(과중한 작업 계산)를 결과 수집기에 푸시하여 결과를 집계합니다.

  • 단순화를 위해 결과 수집기의 인스턴스는 결과를 가져오고 각 소비자의 부분 합계를 계산합니다. 필요한 경우 두 부분합을 쉽게 합산할 수 있습니다.

  • 간단한 예를 살펴보겠습니다.

    • 생산자는 [1,2,3,4,5]를 생성합니다.
    • 소비자 1은 [2,4]를 받은 다음 무거운 작업을 계산하고 결과를 결과 수집기에 전달합니다.
    • 소비자 2는 [1,3,5]를 받은 후 무거운 작업을 계산하고 결과를 결과 수집기에 전달합니다.
    • 결과 수집기는 개수와 부분합을 계산합니다. 예:
    • Consumer1[2,4], 이는 Consumer1으로부터 수신된 2개의 숫자를 의미하며 그 합계는 6입니다.
    • Consumer2[1,3,5]는 이 Consumer2로부터 수신된 3개의 숫자를 의미하며 그 합계는 9입니다.
  • 이 예는 병렬 처리에 대한 분산 처리의 잠재력을 보여줍니다.

먼저 포트 5555에서 실행되는 producer.py라는 프로듀서를 만들어서 .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

다음과 같이 Consumer.py를 생성합니다. 코드에서 두 개의 s를 변경하는 것을 잊지 마세요.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

마지막으로 Collector.py를 개발하고 .
를 다시 변경해 보겠습니다.

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

들여쓰기 오류가 없는지 확인하세요!

$ python3 pair-server.py

먼저, Collector.py를 실행해야 합니다. Collector는 생산자를 시작할 때까지 데이터가 수집되기를 기다립니다.

$ python3 pair-client.py
  • 그런 다음 소비자를 하나씩 시작하고 다른 터미널 창에서 각 명령을 실행합니다.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • 다른 터미널에서도 동일한 명령을 실행하세요.
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • 마지막으로 파이프라인으로 데이터 전송을 시작하는 생산자를 시작합니다.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

잘했어요! ? ZeroMQ를 사용하여 메시징 패턴을 개발하셨습니다!

위 내용은 ZeroMQ를 사용한 분산 시스템에서의 메시징의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.