Home >Backend Development >Python Tutorial >Python+Pika+RabbitMQ environment deployment and implementation of work queue

Python+Pika+RabbitMQ environment deployment and implementation of work queue

高洛峰
高洛峰Original
2017-03-01 14:04:501600browse

The Chinese translation of rabbitmq mainly refers to the letters mq: Message Queue, which means message queue. There is also the word "rabbit" in front of it, which means rabbit. It is the same as the python language called python. Foreigners are quite humorous. The rabbitmq service is similar to the mysql and apache services, but the functions provided are different. rabbimq is used to provide a service for sending messages, which can be used to communicate between different applications.

Install rabbitmq
First install rabbitmq. Under ubuntu 12.04, you can install it directly through apt-get:

sudo apt-get install rabbitmq-server

After installation, the rabbitmq service has been started. Next, let’s look at an example of writing Hello World! in python. The content of the example is to send "Hello World!" from send.py to rabbitmq, and receive.py receives the information sent by send.py from rabbitmq.

Python+Pika+RabbitMQ environment deployment and implementation of work queue

P stands for produce, which means producer, and can also be called sender. In the example, it is shown as send.py; C stands for consumer, which means consumer. Meaning, it can also be called the receiver, which is represented by receive.py in the example; the red one in the middle represents the meaning of the queue, which is represented by the hello queue in the example.

Python uses the rabbitmq service. You can use the ready-made class libraries pika, txAMQP or py-amqplib. Here, pika is chosen.

Install pika

To install pika, you can use pip to install it. pip is the software management package of python. If it is not installed, you can install it through apt-get

sudo apt-get install python-pip

Install pika via pip:

sudo pip install pika

send.py code

Connect to the rabbitmq server. Because it is tested locally, just use localhost.

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

Declare the message queue in which messages will be delivered. If messages are sent to a queue that does not exist, rabbitmq will automatically clear these messages.

channel.queue_declare(queue='hello')

Send a message to the hello queue declared above, where exchange represents the exchanger, which can accurately specify which queue the message should be sent to, and routing_key is set to the name of the queue , the body is the content to be sent, and we will not pay attention to the specific sending details for now.

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

Close connection

connection.close()

Full code

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

Let’s execute this program first. If the execution is successful, rabbitmqctl should successfully add the hello queue, and there should be a message in the queue. Use the rabbitmqctl command to check it

rabbitmqctl list_queues

Output the following information on the author's computer:

Python+Pika+RabbitMQ environment deployment and implementation of work queue


## Indeed There is a hello queue, and there is a message in the queue. Next, use receive.py to obtain the information in the queue.

receive.py code

is the same as the previous two steps of send.py, both of which require connecting to the server first and then declaring the message queue, which will not be discussed here. Posted the same code.

Receiving messages is more complicated, and you need to define a callback function to process it. The callback function here is to print out the information.

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

Tell rabbitmq to use callback to receive information

channel.basic_consume(callback, queue='hello', no_ack=True)

Start receiving information, and Entering the blocking state, callback will be called for processing only when there is information in the queue. Press ctrl+c to exit.

channel.start_consuming()

Complete code

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

Execute the program and you will be able to receive the hello in the queue The message Hello World! is then printed on the screen. Change a terminal and execute send.py again. You can see that receive.py will receive information again.

Work queue example

1. Preparation

In the example program, use new_task .py to simulate the task allocator and worker.py to simulate the worker.

Modify send.py, receive information from the command line parameters, and send

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

Modify the callback function of receive.py.

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

Observe that worker.py receives tasks, and one worker receives 3 tasks:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

Another worker received 2 tasks:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()


更多Python+Pika+RabbitMQ environment deployment and implementation of work queue相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn