上一篇介紹了rabbitmq的安裝和經典的hello world!實例。這裡將對工作隊列(Work Queues)做一個了解。因為是接上一篇說明的,所以如果沒看過上一篇,看這篇可能會比較難懂。上一篇的地址是:ubuntu安裝rabbitmq和python的使用實作
訊息也可以理解為任務,訊息發送者可以理解為任務分配者,訊息接收者可以理解為工作者,當工作者接收到一個任務,還沒完成的時候,任務分配者又發一個任務過來,那就忙不過來了,於是就需要多個工作者來共同處理這些任務,這些工作者,就稱為工作隊列。結構圖如下:
rabbitmq的python實例工作佇列
準備工作(Preparation)
準備工作(Preparation)
修改send.py,從命令列參數接收訊息,並發送
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
修改receive.py的回呼函數。
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
這邊先打開兩個終端,都運行worker.py,處於監聽狀態,這邊就相當於兩個工作者。打開第三個終端,執行new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
觀察worker.py接收到任務,其中一個工作者接收到3個任務:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
另外一個工作者接收到2個任務:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
從上面來看,每個工作者,都會依序分配到任務。那如果一個工作者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其他工作者處理。所以應當有一種機制,當一個工作者完成任務時,就會回饋訊息。
訊息確認就是當工作者完成任務後,會回饋給rabbitmq。修改worker.py中的回呼函數:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
這邊停頓5秒,可以方便ctrl+c退出。
去除no_ack=True參數或設定為False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)用這個程式碼運行,即使其中一個工作者ctrl+c退出後,正在執行的任務也不會遺失,rabbitmq會將任務重新分配給其他工作者。
channel.queue_declare(queue='hello', durable=True)但是這個程式會執行錯誤,因為hello這個佇列已經存在,並且是非持久化的,rabbitmq不允許使用不同的參數來重新定義存在的佇列。重新定義一個佇列:
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))
channel.basic_qos(prefetch_count=1)
new_task.py完整程式碼
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py完整程式碼
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
以上就是Python rabbitmq的使用(二)的內容,更多相關內容請關注PHPcn)!