rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,也就是訊息佇列的意思。前面還有rabbit單詞,就是兔子的意思,跟python語言叫python一樣,老外還蠻幽默的。 rabbitmq服務類似mysql、apache服務,只是提供的功能不一樣。 rabbimq是用來提供發送訊息的服務,可以用在不同的應用程式之間進行通訊。
安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接透過apt-get安裝:
sudo apt-get install rabbitmq-server
安裝好後,rabbitmq服務就已經啟動好了。接下來看下python編寫Hello World!的實例。實例的內容就是從send.py發送「Hello World!」到rabbitmq,receive.py從rabbitmq接收send.py發送的訊息。
其中P表示produce,生產者的意思,也可以稱為發送者,實例中表現為send.py;C表示consumer,消費者的意思,也可以稱為接收者,實例中表現為receive.py;中間紅色的表示佇列的意思,實例中表現為hello佇列。
python使用rabbitmq服務,可以使用現成的類別庫pika、txAMQP或py-amqplib,這裡選擇了pika。
安裝pika
安裝pika可以使用pip來進行安裝,pip是python的軟體管理包,如果沒有安裝,可以透過apt-get安裝
sudo apt-get install python-pip
透過pip安裝pika:
#sudo pip install pika
##send.py代碼
連線到rabbitmq伺服器,因為是在本地測試,所以就用localhost就可以了。connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()聲明訊息佇列,訊息將在這個佇列中傳遞。如果將訊息傳送到不存在的佇列,rabbitmq將會自動清除這些訊息。
channel.queue_declare(queue='hello')發送訊息到上面宣告的hello佇列,其中exchange表示交換器,能精確指定訊息應該傳送到哪個佇列,routing_key設定為佇列的名稱,body就是發送的內容,具體發送細節暫時先不關注。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')關閉連線
#
connection.close()完整程式碼
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
先來執行下這個程序,執行成功的話,rabbitmqctl應該成功增加了hello隊列,並且隊列裡應該有一條信息,用rabbitmqctl命令來查看下
rabbitmqctl list_queues
在筆者的電腦上輸出以下訊息:
##確實有一個hello佇列,並且佇列裡有一個訊息。接下來用receive.py來取得佇列裡的資訊。
receive.py程式碼和send.py的前面兩個步驟一樣,都是要先連接伺服器,然後宣告訊息的佇列,這裡就不再貼同樣代碼了。
接收訊息更為複雜一些,需要定義一個回呼函數來處理,這邊的回呼函數就是將訊息列印出來。
def callback(ch, method, properties, body): print "Received %r" % (body,)
告訴rabbitmq使用callback來接收訊息
channel.basic_consume(callback, queue='hello', no_ack=True)
開始接收訊息,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理。按ctrl+c退出。
channel.start_consuming()完整程式碼
#
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()執行程序,就能夠接收到佇列hello裡的訊息Hello World!,然後印在螢幕上。換一個終端,再次執行send.py,可以看到receive.py這邊會再次接收到訊息。
工作佇列範例
1.準備工作(Preparation)
在實例程式中,用new_task .py來模擬任務分配者, worker.py來模擬工作者。
修改send.py,從命令列參數接收訊息,並發送
#import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
修改receive.py的回呼函數。
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"###這邊先打開兩個終端,都執行worker.py,處於監聽狀態,這邊就相當於兩個工作者。開啟第三個終端,執行new_task.py#########
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....##########觀察worker.py接收到任務,其中一個工作者接收到3個任務:### ######
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'#########另外一個工作者接收到2個任務:#########
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'######
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ環境部署及實作工作佇列相关文章请关注PHP中文网!