Heim > Artikel > Backend-Entwicklung > Einführung in verwandte Probleme des Pika-Moduls in Python (mit Code)
Dieser Artikel bietet Ihnen eine Einführung in Probleme im Zusammenhang mit dem Pika-Modul in Python. Ich hoffe, dass er Ihnen als Referenz dienen wird.
Rabbitmq wird in meiner Arbeit häufig verwendet, und die verwendete Sprache ist hauptsächlich Python, daher wird häufig das Pika-Modul in Python verwendet. Die Verwendung dieses Moduls bereitet mir jedoch auch viele Probleme, die zusammengefasst werden hier Lassen Sie uns über die Lösungen für einige Probleme sprechen, auf die ich während des Änderungsprozesses bei der Verwendung dieses Moduls gestoßen bin
Ein Neuling, der gerade mit dem Schreiben von Code begonnen hat
Verwenden Sie dies am Anfang Wann Bei Verwendung von Rabbitmq muss mein Programm aufgrund seiner eigenen Geschäftsanforderungen Nachrichten von Rabbitmq konsumieren und Nachrichten an Rabbitmq veröffentlichen. Das Logikdiagramm des Codes lautet wie folgt:
Das Das Folgende ist mein Simulationscode:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import time import threading import os import json import datetime from multiprocessing import Process # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消费消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 发布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 单例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instance def process1(recv_serverid, send_serverid): """ 用于测试同时订阅和发布消息 :return: """ # 线程1 用于去 从rabbitmq消费消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主线程去发布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01) class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 进程1 用于模拟模拟程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主进程用于模拟程序2 process1(send_serverid, recv_serverid)
Das obige ist mein Testmodul, das meinen tatsächlichen Code simuliert. und wenn Nachrichten gleichzeitig veröffentlicht werden, Die gleiche Rabbitmq-Verbindung wird zum Abonnieren von Nachrichten und zum Veröffentlichen von Nachrichten auf demselben Kanal verwendet
Aber nachdem dieser Code ausgeführt wurde, wird dies nicht mehr der Fall sein Es dauert lange, bis die folgende Fehlermeldung angezeigt wird:
Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Traceback (most recent call last): File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Exception in thread Thread-1: Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')
Wenn Sie zu diesem Zeitpunkt die Protokollinformationen des Rabbitmq-Dienstes überprüfen, werden Ihnen die Fehlerprotokolle der beiden Situationen wie folgt angezeigt:
Fall 1:
=INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:32:38 === AMQP connection <0.19446.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:32:38 === closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:33:59 === AMQP connection <0.19439.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:33:59 === closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)
Szenario 2:
=INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:41:29 === AMQP connection <0.19045.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content body, got non content body frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::17:41:29 === closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:42:23 === AMQP connection <0.19052.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected method frame, got non method frame instead",none} =INFO REPORT==== 12-Oct-2018::17:42:23 === closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)
Ich habe viele Informationen und Dokumente zu dieser Situation durchsucht, aber keine gute Antwort auf die Links gefunden, die ich zu diesem Problem gefunden habe sind:
https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame
http://rabbitmq.1065348.n5 .nabble.com/UNEXPECTED-FRAME-expected- content-header-for-class-60-got-non-content-header-frame-instead-td34981.html
Viele andere Leute sind darauf gestoßen Problem, aber nach der Überprüfung besteht die endgültige Lösung im Wesentlichen darin, dass zwei Rabbitmq-Verbindungen erstellt werden, eine Verbindung zum Abonnieren von Nachrichten und eine Verbindung zum Veröffentlichen von Nachrichten. In diesem Fall treten die oben genannten Probleme nicht auf
Vor dieser Lösung habe ich die Verwendung derselben Verbindung, aber verschiedener Kanäle, einen Kanal zum Abonnieren von Nachrichten und einen anderen Kanal zum Veröffentlichen von Nachrichten getestet. Der obige Fehler tritt jedoch während des Testvorgangs weiterhin auf.
Am Ende habe ich mich auch dafür entschieden, zwei Verbindungen zu verwenden, um die oben genannten Probleme zu lösen:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import os from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: self.reconnect() self.channel.start_consuming() @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)
Das Obige Code In diesem Beispiel habe ich zwei Verbindungen zum Abonnieren bzw. Veröffentlichen von Nachrichten verwendet. Gleichzeitig verwendete ein anderes Paar aus Abonnement und Veröffentlichung auch zwei Verbindungen zum Abonnieren und Veröffentlichen, sodass die vorherigen Probleme auftreten, wenn das Programm erneut ausgeführt wird tritt nicht auf
Obwohl der obige Code den vorherigen Fehler nicht verursacht, ist das Programm sehr anfällig. Wenn der RabbitMQ-Dienst neu gestartet oder getrennt wird, wird das Programm nicht angezeigt Daher müssen wir dem Code einen Wiederverbindungsmechanismus hinzufügen, damit unser Programm den Wiederverbindungsmechanismus
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import time from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) except Exception as e: print(e) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: try: self.reconnect() self.channel.start_consuming() except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)rabbitmq eine Ausnahme hat 🎜> Selbst nachdem der obige Code ausgeführt wurde, liegt ein Problem mit dem Dienst von RabbitMQ vor. Wenn der Dienst von RabbitMQ jedoch wiederhergestellt wird, kann unser Programm die Verbindung erneut herstellen Der Ort, an dem die Nachricht veröffentlicht wird, stammt von anderen Threads. Zu diesem Zeitpunkt können Sie sie über die Warteschlange implementieren. Wenn sich zu diesem Zeitpunkt längere Zeit keine Daten in Ihrer Warteschlange befinden, kommen die Daten nach einer bestimmten Zeit Zu diesem Zeitpunkt werden Sie feststellen, dass die Verbindung vom Rabbitmq-Server getrennt wurde, aber Sie haben natürlich auch einen Wiederverbindungsmechanismus eingerichtet Stellen Sie die Verbindung wieder her, aber denken Sie darüber nach, warum dies passiert. Überprüfen Sie zu diesem Zeitpunkt das Rabbitmq-Protokoll und Sie werden den folgenden Fehler finden:
=ERROR REPORT==== 8-Oct-2018::15:34:19 === closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672): {heartbeat_timeout,running}Dies ist ein Abfangen des Protokolls meiner vorherigen Testumgebung dass es durch diesen Fehler verursacht wurde, als ich später die Verbindungsparameter von pika zu Rabbitmq überprüfte, gab es einen solchen Parameter Dieser Parameter ist nicht standardmäßig festgelegt Die Heartbeat-Zeit dieses Heatbeats ist nicht standardmäßig festgelegt. Wenn sie nicht festgelegt ist, wird sie vom Server festgelegt, da diese Heartbeat-Zeit mit dem Server ausgehandelt wird. ErgebnisWenn dieser Parameter auf 0 gesetzt ist Dies bedeutet, dass kein Heartbeat gesendet wird und der Server die Verbindung niemals trennt. Daher habe ich hier der Einfachheit halber den Heartbeat des Threads, der die Nachricht veröffentlicht, auf 0 gesetzt und hier habe ich die Paketerfassung geklärt und einen Blick darauf geworfen der Aushandlungsprozess zwischen dem Server und dem Client Aus der Paketerfassungsanalyse können wir ersehen, dass der Server und der Client zunächst 580 Sekunden ausgehandelt haben und der Client geantwortet hat:
Auf diese Weise wird die Verbindung nie getrennt, aber wenn wir den Heartbeat-Wert nicht festlegen, sehen wir beim erneuten Erfassen von Paketen Folgendes:
Aus dem Bild oben können wir löschen, dass das Endergebnis der Aushandlung zwischen Server und Client 580 ist, sodass die Verbindung nach Ablauf der Zeit hergestellt wird, wenn kein Datenaustausch stattfindet vom Server getrennt werden
Besondere Aufmerksamkeit
Es ist wichtig zu beachten, dass nach meinem eigentlichen Test die Einstellung von Heartbeat in Pythons Pika==0.11.2-Version und unten wird nicht wirksam. Nur 0.12.0 und niedriger. Nur die oben genannten Versionseinstellungen können wirksam werden
Das obige ist der detaillierte Inhalt vonEinführung in verwandte Probleme des Pika-Moduls in Python (mit Code). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!