Home >Backend Development >Python Tutorial >Python+Pika+RabbitMQ environment deployment and implementation of work queue
The Chinese translation of rabbitmq mainly refers to the letters mq: Message Queue, which means message queue. There is also the word "rabbit" in front of it, which means rabbit. It is the same as the python language called python. Foreigners are quite humorous. The rabbitmq service is similar to the mysql and apache services, but the functions provided are different. rabbimq is used to provide a service for sending messages, which can be used to communicate between different applications.
Install rabbitmq
First install rabbitmq. Under ubuntu 12.04, you can install it directly through apt-get:
sudo apt-get install rabbitmq-server
After installation, the rabbitmq service has been started. Next, let’s look at an example of writing Hello World! in python. The content of the example is to send "Hello World!" from send.py to rabbitmq, and receive.py receives the information sent by send.py from rabbitmq.
P stands for produce, which means producer, and can also be called sender. In the example, it is shown as send.py; C stands for consumer, which means consumer. Meaning, it can also be called the receiver, which is represented by receive.py in the example; the red one in the middle represents the meaning of the queue, which is represented by the hello queue in the example.
Python uses the rabbitmq service. You can use the ready-made class libraries pika, txAMQP or py-amqplib. Here, pika is chosen.
Install pika
To install pika, you can use pip to install it. pip is the software management package of python. If it is not installed, you can install it through apt-get
sudo apt-get install python-pip
Install pika via pip:
sudo pip install pika
send.py code
Connect to the rabbitmq server. Because it is tested locally, just use localhost.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
Declare the message queue in which messages will be delivered. If messages are sent to a queue that does not exist, rabbitmq will automatically clear these messages.
channel.queue_declare(queue='hello')
Send a message to the hello queue declared above, where exchange represents the exchanger, which can accurately specify which queue the message should be sent to, and routing_key is set to the name of the queue , the body is the content to be sent, and we will not pay attention to the specific sending details for now.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Close connection
connection.close()
Full code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
Let’s execute this program first. If the execution is successful, rabbitmqctl should successfully add the hello queue, and there should be a message in the queue. Use the rabbitmqctl command to check it
rabbitmqctl list_queues
Output the following information on the author's computer:
receive.py code
is the same as the previous two steps of send.py, both of which require connecting to the server first and then declaring the message queue, which will not be discussed here. Posted the same code. Receiving messages is more complicated, and you need to define a callback function to process it. The callback function here is to print out the information.def callback(ch, method, properties, body): print "Received %r" % (body,)Tell rabbitmq to use callback to receive information
channel.basic_consume(callback, queue='hello', no_ack=True)Start receiving information, and Entering the blocking state, callback will be called for processing only when there is information in the queue. Press ctrl+c to exit.
channel.start_consuming()Complete code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()Execute the program and you will be able to receive the hello in the queue The message Hello World! is then printed on the screen. Change a terminal and execute send.py again. You can see that receive.py will receive information again.
Work queue example
1. Preparation
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)Modify the callback function of receive.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....Observe that worker.py receives tasks, and one worker receives 3 tasks:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'Another worker received 2 tasks:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ environment deployment and implementation of work queue相关文章请关注PHP中文网!