Rumah >pembangunan bahagian belakang >Tutorial Python >Pemesejan dalam sistem teragih menggunakan ZeroMQ

Pemesejan dalam sistem teragih menggunakan ZeroMQ

Barbara Streisand
Barbara Streisandasal
2024-11-21 07:33:11494semak imbas

Pemesejan dalam sistem teragih menggunakan ZeroMQ

Mari gunakan Python untuk membangunkan corak pemesejan yang berbeza.

Anda perlu menonton video berikut untuk mengikuti arahan langkah demi langkah.

Luangkan masa anda; pastikan anda menyemak semula arahan sebelum anda menjalankannya.

  • Video berikut menunjukkan arahan yang digunakan dalam tutorial ini.

Messaging in distributed systems using ZeroMQ

Saya menjalankan tutorial ini pada VM GCP saya, tetapi berasa bebas untuk menjalankannya secara setempat ✅

Tutorial ini memperkenalkan konsep soket dalam Python3 menggunakan ZeroMQ. ZeroMQ ialah cara mudah untuk membangunkan soket untuk membolehkan proses yang diedarkan berkomunikasi antara satu sama lain dengan menghantar mesej.

  • Dalam bentuknya yang paling mudah, soket (nod) "mendengar" pada port IP tertentu, manakala soket lain mencapai sambungan. Menggunakan soket, kita boleh mempunyai corak sambungan pada-dengan-satu, satu-ke-banyak dan banyak-ke-banyak.

Corak pemesejan yang akan kita periksa hari ini adalah seperti berikut:

  • Pasangan: Eksklusif, komunikasi satu dengan satu, di mana dua rakan sebaya berkomunikasi antara satu sama lain. Komunikasi adalah dua arah dan tiada keadaan khusus yang disimpan dalam soket. Pelayan mendengar pada port tertentu dan pelanggan menyambung kepadanya.

Messaging in distributed systems using ZeroMQ

  • Pelanggan – Pelayan: Pelanggan menyambung ke satu atau lebih pelayan. Corak ini membenarkan mod REQUEST – RESPONSE. Pelanggan menghantar permintaan "zmq.REQ" dan menerima balasan.

Messaging in distributed systems using ZeroMQ

  • Terbitkan/Langgan: Corak komunikasi tradisional di mana penghantar mesej, dipanggil penerbit, menghantar mesej kepada penerima tertentu, dipanggil pelanggan. Mesej diterbitkan tanpa pengetahuan tentang apa atau jika ada pelanggan pengetahuan itu wujud. Berbilang pelanggan melanggan mesej/topik yang diterbitkan oleh penerbit atau seorang pelanggan boleh menyambung kepada berbilang penerbit.

Messaging in distributed systems using ZeroMQ

  • Soket Tekan dan Tarik (aka Talian Paip): Membenarkan anda mengedarkan mesej kepada berbilang pekerja, disusun dalam saluran paip. Soket Tekan akan mengedarkan mesej yang dihantar kepada pelanggan Tariknya secara sama rata. Ini bersamaan dengan model pengeluar/pengguna, tetapi hasil yang dikira oleh pengguna tidak dihantar ke hulu tetapi ke hilir ke soket tarik/pengguna yang lain.

Messaging in distributed systems using ZeroMQ

? Nota: Bekerja dengan Soket boleh menjadi rumit, berjalan berulang kali kod yang sama, menggunakan nombor port yang sama/soket yang sama, boleh membawa kepada sambungan yang "hang" (pelayan kelihatan seperti sedang berjalan, tetapi ia tidak boleh menerima sambungan). Ini berlaku kerana kami tidak menutup dan memusnahkan sambungan sebelumnya dengan betul.

Cara paling sesuai untuk menangani perkara ini ialah dengan menutup soket dan memusnahkan konteks ZeroMQ. Rujuk untuk mencuba – tangkap blok Fasa 2 dan Fasa 3 untuk butiran lanjut.

Dalam tutorial ini, anda mungkin mengalami masalah seperti itu, cth., menjalankan beberapa kali pelayan yang sama dalam port yang sama. Jika anda menghadapi masalah tergantung, anda dinasihatkan untuk mematikan proses Python, membersihkan nombor port TCP, dan menjalankan pelayan sekali lagi (lihat langkah 11).

Fasa 1: Memadankan pelayan kepada klien

Mari kita mulakan dengan mencipta VM baharu, kemudian kita akan memasang Python3.

  • Simpan salinan IP dalaman VM, untuk tutorial ini kami akan menggunakan alamat IP dalaman.
    1. Buka sambungan terminal baharu dan jalankan arahan berikut (satu demi satu). Perintah terakhir memasang ZeroMQ.
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Jenis: Y apabila digesa.

Banyak aplikasi hari ini terdiri daripada komponen yang merentangi rangkaian, jadi pemesejan adalah penting. Hari ini kami akan menggunakan TCP untuk pemindahan mesej.

Anda boleh mengakses VM anda menggunakan VSC, atau anda boleh menjalankan arahan menggunakan SSH dan mengedit fail dengan pico, dalam kes saya, saya akan menggunakan SSH.

? Pastikan anda menyalin kod dengan berhati-hati.

Kami perlu mencipta pelayan ZeroMQ pertama kami, pelayan akan membenarkan pengikatan dengan hanya satu pelanggan pada satu masa.

  • Buat fail baharu yang dipanggil pair-server.py, kemudian masukkan kod berikut.

  • Kod mencipta soket baharu menggunakan corak zmq.PAIR, kemudian mengikat pelayan ke port IP tertentu (yang telah kami buka dalam GCP). Ambil perhatian bahawa pelayan tidak akan berhenti berjalan sehingga kami menghentikannya.

  • Sila lihat ulasan untuk memahami cara ini berfungsi.

  • Pastikan anda menukar ; itu ialah alamat IP dalaman VM GCP; port klien hendaklah sama dengan pelayan.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

Jangan jalankan pelayan lagi, mula-mula biar kami buat klien.

Buat pelanggan dan luangkan sedikit masa untuk meneliti ulasan. Saya akan memanggilnya pair-client.py.

Pastikan anda menukar ; port hendaklah sama seperti dalam pelayan.

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Kami memerlukan dua tetingkap terminal untuk menjalankan contoh PASANGAN. Kami akan menjalankan pelayan pada satu tetingkap dan klien pada yang lain. Sekarang, jalankannya seperti berikut.

  • Jalankan pelayan
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • Jalankan pelanggan
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Periksa output, kami baru sahaja mencipta soket PAIR baharu.

  • Skrip akan ditamatkan apabila pelanggan melengkapkan sambungannya. Kemudian hentikan pelayan (ctrl c) dan matikan ia.

Kami perlu mengosongkan sambungan TCP sebelum kami menjalankannya semula. Untuk melakukan ini, gunakan arahan berikut.

$ python3 pair-server.py

? Nota:

  • Kita boleh menjalankan hanya satu PAIR pada satu masa, ini bermakna kita tidak boleh mempunyai berbilang pelanggan, ingat ini adalah PAIR, pelanggan pertama akan mengunci soket .

  • Jika kami menjalankan pelayan sekali, dan klien dua kali, klien kedua akan "hang", ini bermakna klien kedua akan menunggu pelayan baharu untuk menyambung.

  • Jika kami ingin menjalankan pasangan lebih daripada sekali, kami perlu mematikan pelayan dan mengosongkan sambungan TCP.

  • PASANGAN sesuai apabila pelanggan perlu mempunyai akses eksklusif kepada pelayan.

  • Kami boleh mempunyai berbilang pelayan kepada berbilang pelanggan sebagai PAIR, tetapi kami perlu menggunakan nombor PORT yang berbeza untuk sambungan.

Setiap fasa adalah bebas antara satu sama lain, jadi, hentikan pelayan, kosongkan port TCP dan beralih ke fasa seterusnya.

Fasa 2: Menggandingkan pelayan kepada berbilang pelanggan

Mari kami membuat sambungan pelayan-pelanggan, di mana berbilang pelanggan akan menyambung ke pelayan tunggal. Ini ialah corak pemesejan yang paling popular.

  • Mari buat pelayan dalam konteks corak REP-REQ (balas permintaan).
  • Kami akan memanggil pelayan rep-server.py, menggunakan port 5555.
$ python3 pair-client.py

Kini kami akan membangunkan dua Pelanggan yang akan sama dari segi fungsi mereka.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()

Biar kami membuat salinan pelanggan ini dan mengeditnya dengan sewajarnya. Jalankan arahan berikut untuk membuat salinan baharu.

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Kemudian edit req-client2.py dan tukar klien 1 kepada klien 2.

Jom edit mesej cetak dan soket (baris 8 dan 9)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

Untuk menjalankan contoh ini, kami memerlukan tiga tetingkap terminal, satu untuk pelayan dan dua untuk pelanggan. Jalankan yang berikut dalam terminal pertama.

  • Mari mulakan pelayan
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
  • Mari mulakan pelanggan pertama
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
  • Mari mulakan pelanggan kedua
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Semak output tetingkap, kami baru sahaja mencipta dua pelanggan bercakap dengan satu pelayan. Anda boleh mempunyai seberapa ramai pelanggan yang anda mahu, anda perlu membuat pelanggan, walaupun dengan fungsi berbeza yang bersambung ke satu pelayan.

? Nota:

  • Pelanggan – Pelayan ialah corak yang paling banyak digunakan, kami sudah menggunakannya dalam kelas 1 apabila kami memasang dan menjalankan pelayan HTTP Apache.

  • Hentikan pelayan dan bersihkan port TCP 5555

    • Bunuh pelayan:


bash
$ sudo fuser -k 5555/tcp

Fasa 3: Memadankan pelayan kepada klien

Pola terbitan – langgan ialah cara yang sangat biasa untuk mengawal penyiaran data kepada banyak pelanggan yang melanggan konteks, dengan cara pelayan menghantar data kepada satu atau lebih pelanggan.

$ python3 pair-server.py

Mari kita buat contoh mudah dahulu.

$ python3 pair-client.py

Biar kami cipta fail baharu, panggil ia pub_server.py.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • Arahan ini akan mengarahkan python untuk menjalankan pelayan secara spesifik dan
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Buat fail baharu pub_client.py.
* Skrip menerima tiga argumen daripada baris arahan (iaitu IP dan dua port).

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()

Kami bersedia untuk menjalankan aplikasi pub-sub kami! Kami memerlukan tiga tingkap terminal. Dalam larian terminal pertama:

$ cp req-client1.py req-client2.py
  • Dalam larian terminal kedua:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
  • Setiap pelayan menjana data cuaca. Contohnya:
    • Poskod, cth.: 10001
    • Bersederhana, cth.: -68

Mari jalankan pelanggan untuk menyambung dan melanggan data melalui poskod cth., 10001 (NYC). Ingat skrip klien melanggan kedua-dua keadaan pelayan. Jalankan arahan seterusnya:

$ python3 rep-server.py
  • Apabila anda selesai membunuh pelayan (ctrl z) dan kosongkan port TCP yang menjalankan arahan seterusnya:
$ python3 req-client1.py
$ python3 req-client2.py
Fasa 4: Tolak/Tarik: Menggunakan corak Talian Paip**

Soket Tekan/Tarik membolehkan anda mengedarkan mesej kepada berbilang pekerja, disusun dalam saluran paip. Ini sangat berguna untuk menjalankan kod secara selari. Soket Tekan akan mengedarkan mesej kepada pelanggan Tariknya secara sama rata dan pelanggan akan menghantar respons kepada pelayan lain, dipanggil pengumpul.

Messaging in distributed systems using ZeroMQ

  • Ini bersamaan dengan model pengeluar/pengguna, tetapi hasil yang dikira oleh pengguna tidak dihantar ke hulu tetapi ke hilir ke soket tarik/pengguna yang lain.

  • Kami akan melaksanakan fungsi berikut.

  • Pengeluar akan MENOLAK nombor rawak dari 0 hingga 10 kepada pengguna.

  • Dua contoh pengguna yang sama akan MENARIK nombor dan akan melaksanakan tugas yang berat.

  • Tugas boleh jadi sebarang pengiraan berat cth., pendaraban matriks.

  • Untuk memudahkan, "tugas berat" kami hanya akan mengembalikan nombor yang sama.

  • Pengguna akan MENOLAK keputusan individu (pengiraan tugas berat) ke Pengumpul Keputusan, yang akan mengagregatkan keputusan.

  • Untuk memudahkan, contoh Pengumpul Hasil akan MENARIK keputusan dan mengira jumlah separa setiap pengguna. Kita boleh menjumlahkan dua jumlah separa dengan mudah jika perlu.

  • Mari kita lihat contoh mudah.

    • Pengeluar menjana [1,2,3,4,5].
    • Pengguna 1 menerima [2,4], kemudian mengira tugas yang berat dan memajukan keputusan kepada Pemungut Hasil.
    • Pengguna 2 menerima [1,3,5], kemudian mengira tugas yang berat dan memajukan keputusan kepada Pemungut Hasil.
    • Pengumpul hasil mengira kiraan dan jumlah separa cth.:
    • Pengguna1[2,4], ini bermakna 2 nombor diterima daripada Pengguna1 dan jumlahnya ialah 6.
    • Consumer2[1,3,5], ini bermakna 3 nombor diterima daripada Consumer2 ini dan jumlahnya ialah 9.
  • Contoh ini menunjukkan potensi pemprosesan teragih untuk pemprosesan selari.

Pertama sekali, mari kita cipta pengeluar yang dipanggil producer.py yang berjalan pada port 5555 pastikan anda menyesuaikan .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq

Kemudian buat consumer.py adalah seperti berikut. Jangan lupa tukar dua s dalam kod.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)

Akhir sekali, mari kita bangunkan collector.py, sekali lagi tukar .

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()

Pastikan anda tidak mengalami ralat lekukan!

$ python3 pair-server.py

Pertama sekali, kita perlu menjalankan collector.py, pengumpul akan menunggu data untuk dikumpul sehingga kita memulakan pengeluar.

$ python3 pair-client.py
  • Kemudian, kami akan memulakan pengguna satu demi satu, jalankan setiap arahan dalam tetingkap terminal yang berbeza.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
  • Jalankan arahan yang sama dalam terminal lain.
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
  • Akhir sekali, kami akan memulakan pengeluar kami yang akan mula menghantar data ke saluran paip kami.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.

Syabas! ? Anda menggunakan ZeroMQ untuk membangunkan corak pemesejan!

Atas ialah kandungan terperinci Pemesejan dalam sistem teragih menggunakan ZeroMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn