Home  >  Article  >  Backend Development  >  Use of Python rabbitmq (2)

Use of Python rabbitmq (2)

黄舟
黄舟Original
2017-01-17 14:51:411231browse

The previous article introduced the installation of rabbitmq and the classic hello world! Example. Here we will have an understanding of Work Queues. Because it is a continuation of the previous article, if you have not read the previous article, this article may be difficult to understand. The address of the previous article is: How to install rabbitmq and python on ubuntu


Messages can also be understood as tasks, the message sender can be understood as the task assigner, and the message receiver It can be understood as a worker. When the worker receives a task and has not completed it, the task allocator sends another task, and it is too busy, so multiple workers are needed to handle these tasks together. Workers are called work queues. The structure diagram is as follows:

Use of Python rabbitmq (2)

rabbitmq’s python instance work queue


Preparation


In the example program, use new_task.py to simulate the task allocator and worker.py to simulate the worker.


Modify send.py, receive information from the command line parameters, and send

import sys
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)

Modify the callback function of receive.py.

import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"

First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

Observe that worker.py receives tasks. One worker receives 3 tasks:

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'

The other worker receives 2 Task:

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'

From the above point of view, each worker will be assigned tasks in turn. So if a worker dies while processing a task, the task is not completed and should be handed over to other workers. So there should be a mechanism that will provide feedback when a worker completes a task.

Message acknowledgment (Message acknowledgment)


Message acknowledgment means that when the worker completes the task, it will be fed back to rabbitmq. Modify the callback function in worker.py:

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(5)
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)


Pause here for 5 seconds to facilitate ctrl+c exit.

You can also remove the no_ack=True parameter or set it to False.

channel.basic_consume(callback, queue='hello', no_ack=False)

Run with this code, even if one of the workers exits with ctrl+c, the task being executed will not be lost, and rabbitmq will redistribute the task to other workers.


Message durability (Message durability)


Although there is a message feedback mechanism, if rabbitmq itself hangs If you drop it, the mission will still be lost. Therefore, tasks need to be stored persistently. Declare persistent storage:

channel.queue_declare(queue='hello', durable=True)

But this program will execute an error because the hello queue already exists and is non-persistent. RabbitMQ does not allow the use of different parameters to redefine existing queues. Redefine a queue:

channel.queue_declare(queue='task_queue', durable=True)

When sending a task, use delivery_mode=2 to mark the task as persistent storage:


channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))

Fair dispatch


In the above example, although each worker is assigned to tasks in turn, each The tasks are not necessarily the same. Some tasks may be heavier and take a longer time to execute; some tasks may be lighter and take a shorter time to execute. It would be best if it can be scheduled fairly. Use basic_qos to set prefetch_count=1 so that rabbitmq will not assign multiple tasks to workers at the same time. That is, only after the worker completes the task, will it receive the task again.

channel.basic_qos(prefetch_count=1)

new_task.py complete code

#!/usr/bin/env python
import pika
import sys
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()

worker.py complete code

#!/usr/bin/env python
import pika
import time
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()

The above is the content of using Python rabbitmq (2). For more related content, please pay attention to PHP Chinese website (www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn