이 글은 Python의 pika 모듈(코드 포함)에 대한 관련 문제를 소개합니다. 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.
Rabbitmq는 내 작업에서 자주 사용되며 사용되는 언어는 주로 Python이므로 Python의 pika 모듈을 자주 사용합니다. 그러나 이 모듈을 사용하면 다음과 같이 이 모듈에 대한 요약이 나옵니다. 프로세스를 사용하는 변경 과정에서 몇 가지 문제에 대한 해결책을 찾았습니다
저는 이제 막 코드를 작성하기 시작한 초보자입니다
처음 Rabbitmq를 사용하기 시작했을 때 비즈니스 요구로 인해 프로그램을 시작해야 했습니다. from Rabbitmq에서 메시지를 사용하려면 Rabbitmq에 메시지를 게시해야 합니다. 코드의 논리 다이어그램은 다음과 같습니다.
다음은 내 시뮬레이션 코드입니다.
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import time import threading import os import json import datetime from multiprocessing import Process # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消费消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 发布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 单例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instance def process1(recv_serverid, send_serverid): """ 用于测试同时订阅和发布消息 :return: """ # 线程1 用于去 从rabbitmq消费消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主线程去发布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01) class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 进程1 用于模拟模拟程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主进程用于模拟程序2 process1(send_serverid, recv_serverid)
위는 내 테스트 모듈입니다. 실제로 실제 비즈니스를 시뮬레이션하기 위한 것입니다. 내 RabbitMQ 모듈은 메시지를 구독할 뿐만 아니라 메시지를 게시하는 동시에 메시지 구독과 메시지 게시는 동일한 RabbitMQ 연결과 채널을 사용합니다.
하지만 이 코드를 실행한 후 기본적으로 오랫동안 실행하지 않으면 다음과 같은 오류 메시지가 표시됩니다.Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Traceback (most recent call last): File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Exception in thread Thread-1: Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')이 때, Rabbitmq 서비스의 로그 정보를 확인하면 오류가 표시됩니다. Case 1:
=INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:32:38 === AMQP connection <0.19446.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:32:38 === closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:33:59 === AMQP connection <0.19439.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:33:59 === closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)Case 2:
=INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:41:29 === AMQP connection <0.19045.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content body, got non content body frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::17:41:29 === closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:42:23 === AMQP connection <0.19052.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected method frame, got non method frame instead",none} =INFO REPORT==== 12-Oct-2018::17:42:23 === closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)이 상황에 대해 많은 정보와 문서를 참조했지만 이에 대해 찾은 링크를 찾지 못했습니다. 문제는 다음과 같습니다: https://stackoverflow.com/questions/49154404/pika-threaded-execution -gets-error-505-unexpected-framehttp://rabbitmq.1065348.n5.nabble.com/UNEXPECTED- FRAME-expected-content-header-for-class-60-got-non-content-header-frame-instead-td34981.html 많은 사람들이 이 문제에 직면했지만 확인 후 최종 해결책은 기본적으로 두 개를 생성하는 것입니다. RabbitMQ 연결, 메시지 구독용 연결 하나, 메시지 게시용 연결 하나. 이 경우 위의 문제는 발생하지 않습니다.이 솔루션 전에는 동일한 연결과 다른 채널을 사용하여 메시지 구독용 채널과 다른 채널을 사용하여 테스트했습니다. 메시지 게시용 채널이지만 테스트 중에 위의 오류가 프로세스 중에 계속 발생합니다. 저는 코딩 기술이 있습니다결국 위의 문제를 해결하기 위해 두 개의 연결을 사용하기로 결정했습니다. 이제 테스트 코드 예가 있습니다.
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import os from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: self.reconnect() self.channel.start_consuming() @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)위의 코드에서는 두 개의 연결을 사용하여 각각 구독하고 게시했습니다. .메시지와 또 다른 구독 및 게시 쌍도 두 개의 연결을 사용하여 구독 및 게시를 수행하므로 프로그램을 다시 실행할 때 이전 문제는 발생하지 않습니다연결 끊김 및 재연결에 대해위 코드는 그렇지 않지만 이전 오류가 발생했지만 이 프로그램은 매우 취약합니다. 또는 rabbitmq에서 예외가 발생하면 우리 프로그램은 다시 연결 메커니즘을 수행할 수도 있습니다
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import time from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) except Exception as e: print(e) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: try: self.reconnect() self.channel.start_consuming() except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)위 코드가 실행된 후,rabbitmq 서비스에 문제가 있더라도,rabbitmq 서비스가 복원되면 우리 프로그램은 다음을 수행할 수 있습니다. 여전히 다시 연결되지만 위의 구현 방법이 일정 시간 동안 실행된 후 메시지가 게시되는 실제 메시지는 다른 스레드나 프로세스에서 얻은 데이터이므로 이때 대기열을 통해 구현할 수 있습니다. 오랫동안 대기열에 메시지가 없습니다. 데이터는 일정 시간이 지난 후 해제되어야 합니다. 이 때 프로그램은 RabbitMQ 서버에 의해 연결이 끊어졌다는 메시지를 표시하지만 이후에는 물론 다시 연결할 수도 있지만 여기에서 왜 이런 일이 발생하는지 생각해 보세요. 이때 Rabbitmq의 로그를 확인하면 다음과 같은 오류가 표시됩니다.
=ERROR REPORT==== 8-Oct-2018::15:34:19 === closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672): {heartbeat_timeout,running}이는 로그를 가로채는 것입니다. 이전 테스트 환경에서 발생한 오류임을 알 수 있습니다. Rabbitmq 에 연결하는 pika의 연결 매개변수에 이러한 매개변수가 있으므로 이 매개변수는 기본적으로 설정되어 있지 않습니다. 이 히트비트의 하트비트 시간은 기본적으로 설정되어 있지 않습니다. 설정하지 않으면 서버에서 완전히 설정하게 됩니다. 왜냐하면 이 하트비트 시간은 서버와의 협상 결과이기 때문입니다.이 매개변수를 0으로 설정하면 하트비트가 전송되지 않고 서버가 연결을 끊지 않는다는 의미이므로 여기서는 편의상 메시지를 게시하는 스레드의 하트비트를 0으로 설정하고 여기서는 패킷 캡처를 정리하고 협상 프로세스를 살펴보았습니다. 서버와 클라이언트 사이 패킷 캡처 분석을 통해 서버와 클라이언트가 처음 580초 동안 협상한 것을 볼 수 있으며 클라이언트는 다음과 같이 응답했습니다.
이렇게 하면 절대 연결이 끊어지지 않으나, heartbeat 값을 설정하지 않으면 패킷을 다시 캡처할 때 다음과 같은 내용이 나옵니다
위 그림에서 삭제하면 됩니다. 서버와 클라이언트 간의 마지막 협상 결과는 580이므로 시간이 다 되어 데이터 교환이 없으면 서버에 의해 연결이 끊어집니다
특별한 주의
특별한 주의가 필요한 것은 그 이후입니다 실제로 Python의 pika를 테스트했습니다. = =0.11.2 이하 버전에서는 하트비트 설정이 적용되지 않습니다. 0.12.0 이상 버전에서만 설정이 적용됩니다
위 내용은 Python의 pika 모듈과 관련된 문제 소개(코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!