Home >Backend Development >Python Tutorial >Introduction to issues related to the pika module in python (with code)
This article brings you an introduction to issues related to the pika module in Python (with code). It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
Rabbitmq is often used in my work, and the language used is mainly python, so the pika module in python is often used. However, the use of this module also brings me a lot of problems, which are summarized here. Let’s talk about the solutions to some problems I encountered during the change process of using this module
A newbie who just started writing code
Use this at the beginning When using rabbitmq, due to its own business needs, my program needs to consume messages from rabbitmq and publish messages to rabbitmq. The logic diagram of the code is as follows:
The following is My simulation code:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import time import threading import os import json import datetime from multiprocessing import Process # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消费消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 发布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 单例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instance def process1(recv_serverid, send_serverid): """ 用于测试同时订阅和发布消息 :return: """ # 线程1 用于去 从rabbitmq消费消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主线程去发布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01) class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 进程1 用于模拟模拟程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主进程用于模拟程序2 process1(send_serverid, recv_serverid)
The above is my test module that changes my actual code. In fact, it simulates actual business. My rabbitmq module already has subscription messages. , and when messages are published, at the same time, The same rabbitmq connection is used to subscribe to messages and publish messages to the same channel
But after this code is run, it will not take long to run. to the following error message:
Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Traceback (most recent call last): File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Exception in thread Thread-1: Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')
At this time, if you check the log information of the rabbitmq service, you will see the error logs in two situations as follows:
Situation 1:
=INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:32:38 === AMQP connection <0.19446.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:32:38 === closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:33:59 === AMQP connection <0.19439.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:33:59 === closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)
Situation 2:
=INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:41:29 === AMQP connection <0.19045.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content body, got non content body frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::17:41:29 === closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:42:23 === AMQP connection <0.19052.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected method frame, got non method frame instead",none} =INFO REPORT==== 12-Oct-2018::17:42:23 === closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)
I have searched a lot of information and documents for this situation, but I have not found a good answer. The links I found about this problem are:
https:/ /stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame
http://rabbitmq.1065348.n5.nabble.com/UNEXPECTED-FRAME-expected- content-header-for-class-60-got-non-content-header-frame-instead-td34981.html
Many other people have encountered this problem, but after checking, the final solution is basically Two rabbitmq connections are created, one connection is used to subscribe to messages, and one connection is used to publish messages. In this case, the above problems will not occur
Before this solution, I tested Use the same connection but different channels, use one channel to subscribe to messages, and use another channel to publish messages, but the above error will still occur during the test process.
In the end, I also chose to use two connections to solve the above problems. Now is a test code example:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import os from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: self.reconnect() self.channel.start_consuming() @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)
The above code I used two connections to subscribe and publish messages respectively. At the same time, another pair of subscription and publishing also used two connections to perform subscription and publishing, so that when the program is run again, the previous problems will not occur
Although the above code will not cause the previous error, the program is very fragile. When the rabbitmq service is restarted or disconnected, the program will not have a reconnection mechanism. , so we need to add a reconnection mechanism to the code, so that even if the rabbitmq service is restarted or
rabbitmq has an exception, our program can still perform the reconnection mechanism
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import time from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) except Exception as e: print(e) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: try: self.reconnect() self.channel.start_consuming() except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 这里分别用两个线程去连接和发送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 这里也是用两个连接去连接和发送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)
Even after the above code is run, There is a problem with rabbitmq's service, but when rabbitmq's service is restored, our program can still reconnect. However, after the above implementation method has been running for a period of time, because the actual message publishing place is from other threads Or the data obtained in the process. At this time, you may implement it through the queue. At this time, if there is no data in your queue for a long time, the data comes after a certain period of time and needs to be released. At this time, you will find that your program will It prompts that the connection was disconnected by the rabbitmq server, but after all, you have set up a reconnection mechanism. Of course, you can also reconnect, but think about why this happens. At this time, check the rabbitmq log and you will find the following. Error:
=ERROR REPORT==== 8-Oct-2018::15:34:19 === closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672): {heartbeat_timeout,running}
This is an interception of the log of my previous test environment. It can be seen that it is caused by this error. Later, I checked the connection parameters of pika to rabbitmq and there is such a parameter
This parameter is not set by default, so the heartbeat time of this heatbeat is not set by default. If it is not set, it will be set by the server, because this heartbeat time is negotiated with the server. Result
When this parameter is set to 0, it means that the heartbeat will not be sent, and the server will never disconnect the connection, so here for convenience, I set the heartbeat of the thread that publishes the message to 0, and I Here, I will sort through the packet capture and take a look at the negotiation process between the server and the client
From the packet capture analysis, we can see that the server and client first negotiated 580 seconds, and the client replied:
In this way, the connection will never be disconnected, but if we do not set the heartbeat value, we will see the following when capturing the packet again
From the picture above, we can delete the final result of the negotiation between the server and the client, which is 580. In this way, when the time is up, if there is no data exchange, the connection will be disconnected by the server.
Special Attention
What needs special attention is that after my actual test, the heartbeat setting of python's pika==0.11.2 version and below does not take effect. Only 0.12.0 and below Only the above version settings can take effect
The above is the detailed content of Introduction to issues related to the pika module in python (with code). For more information, please follow other related articles on the PHP Chinese website!