search
HomeBackend DevelopmentPython TutorialPython+Pika+RabbitMQ environment deployment and implementation of work queue

The Chinese translation of rabbitmq mainly refers to the letters mq: Message Queue, which means message queue. There is also the word "rabbit" in front of it, which means rabbit. It is the same as the python language called python. Foreigners are quite humorous. The rabbitmq service is similar to the mysql and apache services, but the functions provided are different. rabbimq is used to provide a service for sending messages, which can be used to communicate between different applications.

Install rabbitmq
First install rabbitmq. Under ubuntu 12.04, you can install it directly through apt-get:

sudo apt-get install rabbitmq-server

After installation, the rabbitmq service has been started. Next, let’s look at an example of writing Hello World! in python. The content of the example is to send "Hello World!" from send.py to rabbitmq, and receive.py receives the information sent by send.py from rabbitmq.

Python+Pika+RabbitMQ environment deployment and implementation of work queue

P stands for produce, which means producer, and can also be called sender. In the example, it is shown as send.py; C stands for consumer, which means consumer. Meaning, it can also be called the receiver, which is represented by receive.py in the example; the red one in the middle represents the meaning of the queue, which is represented by the hello queue in the example.

Python uses the rabbitmq service. You can use the ready-made class libraries pika, txAMQP or py-amqplib. Here, pika is chosen.

Install pika

To install pika, you can use pip to install it. pip is the software management package of python. If it is not installed, you can install it through apt-get

sudo apt-get install python-pip

Install pika via pip:

sudo pip install pika

send.py code

Connect to the rabbitmq server. Because it is tested locally, just use localhost.

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

Declare the message queue in which messages will be delivered. If messages are sent to a queue that does not exist, rabbitmq will automatically clear these messages.

channel.queue_declare(queue='hello')

Send a message to the hello queue declared above, where exchange represents the exchanger, which can accurately specify which queue the message should be sent to, and routing_key is set to the name of the queue , the body is the content to be sent, and we will not pay attention to the specific sending details for now.

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

Close connection

connection.close()

Full code

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

Let’s execute this program first. If the execution is successful, rabbitmqctl should successfully add the hello queue, and there should be a message in the queue. Use the rabbitmqctl command to check it

rabbitmqctl list_queues

Output the following information on the author's computer:

Python+Pika+RabbitMQ environment deployment and implementation of work queue


## Indeed There is a hello queue, and there is a message in the queue. Next, use receive.py to obtain the information in the queue.

receive.py code

is the same as the previous two steps of send.py, both of which require connecting to the server first and then declaring the message queue, which will not be discussed here. Posted the same code.

Receiving messages is more complicated, and you need to define a callback function to process it. The callback function here is to print out the information.

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

Tell rabbitmq to use callback to receive information

channel.basic_consume(callback, queue='hello', no_ack=True)

Start receiving information, and Entering the blocking state, callback will be called for processing only when there is information in the queue. Press ctrl+c to exit.

channel.start_consuming()

Complete code

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

Execute the program and you will be able to receive the hello in the queue The message Hello World! is then printed on the screen. Change a terminal and execute send.py again. You can see that receive.py will receive information again.

Work queue example

1. Preparation

In the example program, use new_task .py to simulate the task allocator and worker.py to simulate the worker.

Modify send.py, receive information from the command line parameters, and send

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

Modify the callback function of receive.py.

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

Observe that worker.py receives tasks, and one worker receives 3 tasks:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

Another worker received 2 tasks:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()


更多Python+Pika+RabbitMQ environment deployment and implementation of work queue相关文章请关注PHP中文网!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Python vs. C  : Understanding the Key DifferencesPython vs. C : Understanding the Key DifferencesApr 21, 2025 am 12:18 AM

Python and C each have their own advantages, and the choice should be based on project requirements. 1) Python is suitable for rapid development and data processing due to its concise syntax and dynamic typing. 2)C is suitable for high performance and system programming due to its static typing and manual memory management.

Python vs. C  : Which Language to Choose for Your Project?Python vs. C : Which Language to Choose for Your Project?Apr 21, 2025 am 12:17 AM

Choosing Python or C depends on project requirements: 1) If you need rapid development, data processing and prototype design, choose Python; 2) If you need high performance, low latency and close hardware control, choose C.

Reaching Your Python Goals: The Power of 2 Hours DailyReaching Your Python Goals: The Power of 2 Hours DailyApr 20, 2025 am 12:21 AM

By investing 2 hours of Python learning every day, you can effectively improve your programming skills. 1. Learn new knowledge: read documents or watch tutorials. 2. Practice: Write code and complete exercises. 3. Review: Consolidate the content you have learned. 4. Project practice: Apply what you have learned in actual projects. Such a structured learning plan can help you systematically master Python and achieve career goals.

Maximizing 2 Hours: Effective Python Learning StrategiesMaximizing 2 Hours: Effective Python Learning StrategiesApr 20, 2025 am 12:20 AM

Methods to learn Python efficiently within two hours include: 1. Review the basic knowledge and ensure that you are familiar with Python installation and basic syntax; 2. Understand the core concepts of Python, such as variables, lists, functions, etc.; 3. Master basic and advanced usage by using examples; 4. Learn common errors and debugging techniques; 5. Apply performance optimization and best practices, such as using list comprehensions and following the PEP8 style guide.

Choosing Between Python and C  : The Right Language for YouChoosing Between Python and C : The Right Language for YouApr 20, 2025 am 12:20 AM

Python is suitable for beginners and data science, and C is suitable for system programming and game development. 1. Python is simple and easy to use, suitable for data science and web development. 2.C provides high performance and control, suitable for game development and system programming. The choice should be based on project needs and personal interests.

Python vs. C  : A Comparative Analysis of Programming LanguagesPython vs. C : A Comparative Analysis of Programming LanguagesApr 20, 2025 am 12:14 AM

Python is more suitable for data science and rapid development, while C is more suitable for high performance and system programming. 1. Python syntax is concise and easy to learn, suitable for data processing and scientific computing. 2.C has complex syntax but excellent performance and is often used in game development and system programming.

2 Hours a Day: The Potential of Python Learning2 Hours a Day: The Potential of Python LearningApr 20, 2025 am 12:14 AM

It is feasible to invest two hours a day to learn Python. 1. Learn new knowledge: Learn new concepts in one hour, such as lists and dictionaries. 2. Practice and exercises: Use one hour to perform programming exercises, such as writing small programs. Through reasonable planning and perseverance, you can master the core concepts of Python in a short time.

Python vs. C  : Learning Curves and Ease of UsePython vs. C : Learning Curves and Ease of UseApr 19, 2025 am 12:20 AM

Python is easier to learn and use, while C is more powerful but complex. 1. Python syntax is concise and suitable for beginners. Dynamic typing and automatic memory management make it easy to use, but may cause runtime errors. 2.C provides low-level control and advanced features, suitable for high-performance applications, but has a high learning threshold and requires manual memory and type safety management.

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

MantisBT

MantisBT

Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

Dreamweaver Mac version

Dreamweaver Mac version

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

PhpStorm Mac version

PhpStorm Mac version

The latest (2018.2.1) professional PHP integrated development tool

WebStorm Mac version

WebStorm Mac version

Useful JavaScript development tools