Rumah >pembangunan bahagian belakang >Tutorial Python >Pemesejan dalam sistem teragih menggunakan ZeroMQ
Mari gunakan Python untuk membangunkan corak pemesejan yang berbeza.
Anda perlu menonton video berikut untuk mengikuti arahan langkah demi langkah.
Luangkan masa anda; pastikan anda menyemak semula arahan sebelum anda menjalankannya.
Saya menjalankan tutorial ini pada VM GCP saya, tetapi berasa bebas untuk menjalankannya secara setempat ✅
Tutorial ini memperkenalkan konsep soket dalam Python3 menggunakan ZeroMQ. ZeroMQ ialah cara mudah untuk membangunkan soket untuk membolehkan proses yang diedarkan berkomunikasi antara satu sama lain dengan menghantar mesej.
Corak pemesejan yang akan kita periksa hari ini adalah seperti berikut:
? Nota: Bekerja dengan Soket boleh menjadi rumit, berjalan berulang kali kod yang sama, menggunakan nombor port yang sama/soket yang sama, boleh membawa kepada sambungan yang "hang" (pelayan kelihatan seperti sedang berjalan, tetapi ia tidak boleh menerima sambungan). Ini berlaku kerana kami tidak menutup dan memusnahkan sambungan sebelumnya dengan betul.
Cara paling sesuai untuk menangani perkara ini ialah dengan menutup soket dan memusnahkan konteks ZeroMQ. Rujuk untuk mencuba – tangkap blok Fasa 2 dan Fasa 3 untuk butiran lanjut.
Dalam tutorial ini, anda mungkin mengalami masalah seperti itu, cth., menjalankan beberapa kali pelayan yang sama dalam port yang sama. Jika anda menghadapi masalah tergantung, anda dinasihatkan untuk mematikan proses Python, membersihkan nombor port TCP, dan menjalankan pelayan sekali lagi (lihat langkah 11).
Mari kita mulakan dengan mencipta VM baharu, kemudian kita akan memasang Python3.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Jenis: Y apabila digesa.
Banyak aplikasi hari ini terdiri daripada komponen yang merentangi rangkaian, jadi pemesejan adalah penting. Hari ini kami akan menggunakan TCP untuk pemindahan mesej.
Anda boleh mengakses VM anda menggunakan VSC, atau anda boleh menjalankan arahan menggunakan SSH dan mengedit fail dengan pico, dalam kes saya, saya akan menggunakan SSH.
? Pastikan anda menyalin kod dengan berhati-hati.
Kami perlu mencipta pelayan ZeroMQ pertama kami, pelayan akan membenarkan pengikatan dengan hanya satu pelanggan pada satu masa.
Buat fail baharu yang dipanggil pair-server.py, kemudian masukkan kod berikut.
Kod mencipta soket baharu menggunakan corak zmq.PAIR, kemudian mengikat pelayan ke port IP tertentu (yang telah kami buka dalam GCP). Ambil perhatian bahawa pelayan tidak akan berhenti berjalan sehingga kami menghentikannya.
Sila lihat ulasan untuk memahami cara ini berfungsi.
Pastikan anda menukar ; itu ialah alamat IP dalaman VM GCP; port klien hendaklah sama dengan pelayan.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Jangan jalankan pelayan lagi, mula-mula biar kami buat klien.
Buat pelanggan dan luangkan sedikit masa untuk meneliti ulasan. Saya akan memanggilnya pair-client.py.
Pastikan anda menukar ; port hendaklah sama seperti dalam pelayan.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Kami memerlukan dua tetingkap terminal untuk menjalankan contoh PASANGAN. Kami akan menjalankan pelayan pada satu tetingkap dan klien pada yang lain. Sekarang, jalankannya seperti berikut.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Periksa output, kami baru sahaja mencipta soket PAIR baharu.
Kami perlu mengosongkan sambungan TCP sebelum kami menjalankannya semula. Untuk melakukan ini, gunakan arahan berikut.
$ python3 pair-server.py
? Nota:
Kita boleh menjalankan hanya satu PAIR pada satu masa, ini bermakna kita tidak boleh mempunyai berbilang pelanggan, ingat ini adalah PAIR, pelanggan pertama akan mengunci soket .
Jika kami menjalankan pelayan sekali, dan klien dua kali, klien kedua akan "hang", ini bermakna klien kedua akan menunggu pelayan baharu untuk menyambung.
Jika kami ingin menjalankan pasangan lebih daripada sekali, kami perlu mematikan pelayan dan mengosongkan sambungan TCP.
PASANGAN sesuai apabila pelanggan perlu mempunyai akses eksklusif kepada pelayan.
Kami boleh mempunyai berbilang pelayan kepada berbilang pelanggan sebagai PAIR, tetapi kami perlu menggunakan nombor PORT yang berbeza untuk sambungan.
Setiap fasa adalah bebas antara satu sama lain, jadi, hentikan pelayan, kosongkan port TCP dan beralih ke fasa seterusnya.
Mari kami membuat sambungan pelayan-pelanggan, di mana berbilang pelanggan akan menyambung ke pelayan tunggal. Ini ialah corak pemesejan yang paling popular.
$ python3 pair-client.py
Kini kami akan membangunkan dua Pelanggan yang akan sama dari segi fungsi mereka.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
Biar kami membuat salinan pelanggan ini dan mengeditnya dengan sewajarnya. Jalankan arahan berikut untuk membuat salinan baharu.
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Kemudian edit req-client2.py dan tukar klien 1 kepada klien 2.
Jom edit mesej cetak dan soket (baris 8 dan 9)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Untuk menjalankan contoh ini, kami memerlukan tiga tetingkap terminal, satu untuk pelayan dan dua untuk pelanggan. Jalankan yang berikut dalam terminal pertama.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Semak output tetingkap, kami baru sahaja mencipta dua pelanggan bercakap dengan satu pelayan. Anda boleh mempunyai seberapa ramai pelanggan yang anda mahu, anda perlu membuat pelanggan, walaupun dengan fungsi berbeza yang bersambung ke satu pelayan.
? Nota:
Pelanggan – Pelayan ialah corak yang paling banyak digunakan, kami sudah menggunakannya dalam kelas 1 apabila kami memasang dan menjalankan pelayan HTTP Apache.
Hentikan pelayan dan bersihkan port TCP 5555
- Bunuh pelayan:
bash
$ sudo fuser -k 5555/tcp
Pola terbitan – langgan ialah cara yang sangat biasa untuk mengawal penyiaran data kepada banyak pelanggan yang melanggan konteks, dengan cara pelayan menghantar data kepada satu atau lebih pelanggan.
$ python3 pair-server.py
Mari kita buat contoh mudah dahulu.
$ python3 pair-client.py
Biar kami cipta fail baharu, panggil ia pub_server.py.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Buat fail baharu pub_client.py.
* Skrip menerima tiga argumen daripada baris arahan (iaitu IP dan dua port).
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Kami bersedia untuk menjalankan aplikasi pub-sub kami! Kami memerlukan tiga tingkap terminal. Dalam larian terminal pertama:
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Mari jalankan pelanggan untuk menyambung dan melanggan data melalui poskod cth., 10001 (NYC). Ingat skrip klien melanggan kedua-dua keadaan pelayan. Jalankan arahan seterusnya:
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
Soket Tekan/Tarik membolehkan anda mengedarkan mesej kepada berbilang pekerja, disusun dalam saluran paip. Ini sangat berguna untuk menjalankan kod secara selari. Soket Tekan akan mengedarkan mesej kepada pelanggan Tariknya secara sama rata dan pelanggan akan menghantar respons kepada pelayan lain, dipanggil pengumpul.
Ini bersamaan dengan model pengeluar/pengguna, tetapi hasil yang dikira oleh pengguna tidak dihantar ke hulu tetapi ke hilir ke soket tarik/pengguna yang lain.
Kami akan melaksanakan fungsi berikut.
Pengeluar akan MENOLAK nombor rawak dari 0 hingga 10 kepada pengguna.
Dua contoh pengguna yang sama akan MENARIK nombor dan akan melaksanakan tugas yang berat.
Tugas boleh jadi sebarang pengiraan berat cth., pendaraban matriks.
Untuk memudahkan, "tugas berat" kami hanya akan mengembalikan nombor yang sama.
Pengguna akan MENOLAK keputusan individu (pengiraan tugas berat) ke Pengumpul Keputusan, yang akan mengagregatkan keputusan.
Untuk memudahkan, contoh Pengumpul Hasil akan MENARIK keputusan dan mengira jumlah separa setiap pengguna. Kita boleh menjumlahkan dua jumlah separa dengan mudah jika perlu.
Mari kita lihat contoh mudah.
Contoh ini menunjukkan potensi pemprosesan teragih untuk pemprosesan selari.
Pertama sekali, mari kita cipta pengeluar yang dipanggil producer.py yang berjalan pada port 5555 pastikan anda menyesuaikan .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Kemudian buat consumer.py adalah seperti berikut. Jangan lupa tukar dua s dalam kod.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Akhir sekali, mari kita bangunkan collector.py, sekali lagi tukar .
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Pastikan anda tidak mengalami ralat lekukan!
$ python3 pair-server.py
Pertama sekali, kita perlu menjalankan collector.py, pengumpul akan menunggu data untuk dikumpul sehingga kita memulakan pengeluar.
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Syabas! ? Anda menggunakan ZeroMQ untuk membangunkan corak pemesejan!
Atas ialah kandungan terperinci Pemesejan dalam sistem teragih menggunakan ZeroMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!