The Chinese translation of rabbitmq mainly refers to the letters mq: Message Queue, which means message queue. There is also the word "rabbit" in front of it, which means rabbit. It is the same as the python language called python. Foreigners are quite humorous. The rabbitmq service is similar to the mysql and apache services, but the functions provided are different. rabbimq is used to provide a service for sending messages, which can be used to communicate between different applications.
Install rabbitmq
First install rabbitmq. Under ubuntu 12.04, you can install it directly through apt-get:
sudo apt-get install rabbitmq-server
After installation, the rabbitmq service has been started. Next, let’s look at an example of writing Hello World! in python. The content of the example is to send "Hello World!" from send.py to rabbitmq, and receive.py receives the information sent by send.py from rabbitmq.
P stands for produce, which means producer, and can also be called sender. In the example, it is shown as send.py; C stands for consumer, which means consumer. Meaning, it can also be called the receiver, which is represented by receive.py in the example; the red one in the middle represents the meaning of the queue, which is represented by the hello queue in the example.
Python uses the rabbitmq service. You can use the ready-made class libraries pika, txAMQP or py-amqplib. Here, pika is chosen.
Install pika
To install pika, you can use pip to install it. pip is the software management package of python. If it is not installed, you can install it through apt-get
sudo apt-get install python-pip
Install pika via pip:
sudo pip install pika
send.py code
Connect to the rabbitmq server. Because it is tested locally, just use localhost.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
Declare the message queue in which messages will be delivered. If messages are sent to a queue that does not exist, rabbitmq will automatically clear these messages.
Send a message to the hello queue declared above, where exchange represents the exchanger, which can accurately specify which queue the message should be sent to, and routing_key is set to the name of the queue , the body is the content to be sent, and we will not pay attention to the specific sending details for now.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Close connection
Full code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
Let’s execute this program first. If the execution is successful, rabbitmqctl should successfully add the hello queue, and there should be a message in the queue. Use the rabbitmqctl command to check it
rabbitmqctl list_queues
Output the following information on the author's computer:
receive.py code
is the same as the previous two steps of send.py, both of which require connecting to the server first and then declaring the message queue, which will not be discussed here. Posted the same code. Receiving messages is more complicated, and you need to define a callback function to process it. The callback function here is to print out the information.def callback(ch, method, properties, body): print "Received %r" % (body,)Tell rabbitmq to use callback to receive information
channel.basic_consume(callback, queue='hello', no_ack=True)Start receiving information, and Entering the blocking state, callback will be called for processing only when there is information in the queue. Press ctrl+c to exit.
channel.start_consuming()Complete code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()Execute the program and you will be able to receive the hello in the queue The message Hello World! is then printed on the screen. Change a terminal and execute send.py again. You can see that receive.py will receive information again.
Work queue example
1. Preparation
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)Modify the callback function of receive.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....Observe that worker.py receives tasks, and one worker receives 3 tasks:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'Another worker received 2 tasks:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
2.消息确认(Message acknowledgment)
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='hello', no_ack=False)
3.消息持久化存储(Message durability)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
