ホームページ >バックエンド開発 >Python チュートリアル >Python の pika モジュールに関連する問題の紹介 (コード付き)
この記事では、Python の pika モジュールに関する問題を紹介します (コード付き)。一定の参考価値があります。困っている友人は参照してください。お役に立てれば幸いです。
私の仕事ではRabbitmqがよく使われており、使用言語も主にPythonなのでPythonのpikaモジュールがよく使われますが、このモジュールを使用すると色々な問題も発生するのでまとめます。このモジュールを使用する変更プロセス中に遭遇したいくつかの問題の解決策について話しましょう
コードを書き始めたばかりの初心者
最初にこれを使用してくださいRabbitmq を使用する場合、独自のビジネス ニーズにより、プログラムは Rabbitmq からのメッセージを消費し、rabbitmq にメッセージをパブリッシュする必要があります。コードの論理図は次のとおりです:
以下は私のシミュレーション コードです:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import time import threading import os import json import datetime from multiprocessing import Process # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消费消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 发布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 单例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instance def process1(recv_serverid, send_serverid): """ 用于测试同时订阅和发布消息 :return: """ # 线程1 用于去 从rabbitmq消费消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主线程去发布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01) class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 进程1 用于模拟模拟程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主进程用于模拟程序2 process1(send_serverid, recv_serverid)
上記は、実際のコードを変更するテスト モジュールです。実際、実際のビジネスをシミュレートしています。私の Rabbitmq モジュールにはすでにサブスクリプション メッセージがあります。 . 、メッセージがパブリッシュされるとき、同時に、 同じ Rabbitmq 接続を使用してメッセージをサブスクライブし、同じチャネルにメッセージをパブリッシュします
しかし、このコードが実行されると、実行には時間がかかりません。
Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Traceback (most recent call last): File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Exception in thread Thread-1: Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')
このとき、rabbitmq サービスのログ情報を確認すると、次の 2 つの状況でエラー ログが表示されます。
状況 1:=INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:32:38 === AMQP connection <0.19446.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:32:38 === closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:33:59 === AMQP connection <0.19439.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:33:59 === closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)状況 2:
=INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:41:29 === AMQP connection <0.19045.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content body, got non content body frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::17:41:29 === closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:42:23 === AMQP connection <0.19052.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected method frame, got non method frame instead",none} =INFO REPORT==== 12-Oct-2018::17:42:23 === closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)この状況について多くの情報やドキュメントを検索しましたが、適切な答えが見つかりませんでした。この問題は次のとおりです: https:///stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-framehttp://rabbitmq.1065348 .n5.nabble.com/UNEXPECTED-FRAME-expected-content-header-for-class-60-got-non-content-header-frame-instead-td34981.html 他にも多くの人がこの問題に遭遇しています。問題はありますが、確認後の最終的な解決策は、基本的に 2 つの Rabbitmq 接続が作成され、1 つの接続はメッセージのサブスクライブに使用され、1 つの接続はメッセージのパブリッシュに使用されます。この場合、上記の問題は発生しませんこの解決策の前に、同じ接続で異なるチャネルを使用し、1 つのチャネルを使用してメッセージをサブスクライブし、別のチャネルを使用してメッセージをパブリッシュすることをテストしましたが、テスト プロセス中に上記のエラーが引き続き発生します。 私にはコードを書く能力がある程度あります最終的に、上記の問題を解決するために 2 つの接続を使用することも選択しました。次に、テスト コードの例を示します:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import os from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: self.reconnect() self.channel.start_consuming() @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)上記のコードでは、メッセージのサブスクライブとパブリッシュにそれぞれ 2 つの接続を使用しましたが、同時に、別のサブスクリプションとパブリッシングのペアも 2 つの接続を使用してサブスクリプションとパブリッシュを実行するため、プログラムを再度実行するときに、以前の問題は発生しません。
切断と再接続について上記のコードでは前述のエラーは発生しませんが、プログラムは非常に脆弱なので、rabbitmq サービスが再起動または切断されると、プログラムには再接続メカニズムがありません。したがって、rabbitmq サービスが再起動されたり、rabbitmq に例外が発生した場合でも、プログラムが引き続き再接続メカニズムを実行できるように、コードに再接続メカニズムを追加する必要があります
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import time from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) except Exception as e: print(e) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: try: self.reconnect() self.channel.start_consuming() except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)上記のコードを実行した後でも、rabbitmq のサービスに問題がありますが、rabbitmq のサービスが復元されると、プログラムは引き続き再接続できます。ただし、上記の実装メソッドが一定期間実行されると、実際のメッセージの発行が行われないため、場所は他のスレッドからのもの、またはプロセスで取得したデータです。このとき、キューを介して実装することもできます。このとき、キューに長時間データがなかった場合、一定時間後にデータが来ます。この時点で、プログラムは、rabbitmq サーバーによって接続が切断されたことを示すプロンプトを表示しますが、結局のところ、再接続メカニズムがセットアップされています。もちろん、再接続することもできます。このとき、rabbitmq のログを確認すると、次のようなメッセージが表示されます。エラー:
=ERROR REPORT==== 8-Oct-2018::15:34:19 === closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672): {heartbeat_timeout,running}これは、以前のテスト環境のログのインターセプトです。このエラーが原因で、pika から Rabbitmq への接続パラメータを確認したところ、そのようなパラメータがありました このパラメータはデフォルトでは設定されていないため、ハートビート時間がこのヒートビートの時間はデフォルトでは設定されていません。設定されていない場合、このハートビート時間はサーバーとネゴシエートされるため、サーバーによって設定されます。結果このパラメータが 0 に設定されている場合、それは意味します。ハートビートは送信されず、サーバーは接続を決して切断しないため、ここでは便宜上、メッセージを発行するスレッドのハートビートを 0 に設定します。ここでは、パケット キャプチャを分類して、サーバーとクライアント間のネゴシエーション プロセスを見てください パケット キャプチャ分析から、サーバーとクライアントが最初に 580 秒ネゴシエートし、クライアントが次のように応答したことがわかります。
この方法では、接続が切断されることはありませんが、ハートビート値を設定しない場合、再度パケットをキャプチャすると次のようになります。
上の図から、サーバーとクライアント間のネゴシエーションの最終結果である 580 を削除できます。このようにして、データ交換がない場合、時間切れになった場合、接続はサーバーによって切断されます。
特別な注意
特に注意が必要なのは、実際のテスト後、Python の pika==0.11 のハートビート設定であることです。 2 以前のバージョンでは有効になりません。0.12.0 以下のみ上記のバージョンの設定のみが有効になります。
以上がPython の pika モジュールに関連する問題の紹介 (コード付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。