RabbitMQ:接受訊息再傳遞訊息,可以視為一個「郵局」。發送者和接受者透過佇列來進行交互,佇列的大小可以視為無限的,多個發送者可以發生給一個佇列,多個接收者也可以從一個佇列中接受訊息。
rabbitmq使用的協定是amqp,用於python的推薦客戶端是pika
pip install pika -i https://pypi.douban.com/simple/
send.py
# coding: utf8 import pika # 建立一个连接 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 连接本地的RabbitMQ服务器 channel = connection.channel() # 获得channel
這裡連結的是本機的,如果想要連接其他機器上的伺服器,只要填入位址或主機名稱即可。
接下來我們開始發送訊息了,注意要確保接受訊息的佇列是存在的,否則rabbitmq就丟棄掉該訊息
channel.queue_declare(queue='hello') # 在RabbitMQ中创建hello这个队列 channel.basic_publish(exchange='', # 使用默认的exchange来发送消息到队列 routing_key='hello', # 发送到该队列 hello 中 body='Hello World!') # 消息内容 connection.close() # 关闭 同时flush
RabbitMQ預設需要1GB的空閒磁碟空間,否則發送會失敗。
這時已在本機佇列hello中存放了一個訊息,如果使用rabbitmqctl list_queues 可看到
hello 1
說明有一個hello佇列裡面存放了一個訊息
receive .py
# coding: utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
還是先連結到伺服器,和之前發送時相同
channel.queue_declare(queue='hello') # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错 def callback(ch, method, properties, body): # 用于接收到消息后的回调 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', # 收指定队列hello的消息 no_ack=True) #在处理完消息后不发送ack给服务器 channel.start_consuming() # 启动消息接受 这会进入一个死循环
工作佇列是用來分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是在安排這些任務之後執行。例如我們把task當作message送到佇列裡,啟動工作進程來接受並最終執行,且可啟動多個工作進程來運作。這適用於web應用,即不應在一個http請求的處理視窗內完成複雜任務。
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
分配訊息的方式為 輪詢 即每個工作進程獲得相同的訊息數。
如果訊息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該訊息就遺失了,因為rabbitmq一旦把一個訊息分發給工作進程,它就把該訊息刪掉了。
為了預防訊息遺失,rabbitmq提供了ack,即工作進程在收到訊息並處理後,發送ack給rabbitmq,告知rabbitmq這時候可以把該訊息從佇列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那麼會把該訊息 重新分發給其他工作進程。不需要設定timeout,即使該任務需要很長時間也可以處理。
ack預設是開啟的,之前我們的工作進程顯示指定了no_ack=True
channel.basic_consume(callback, queue='hello') # 会启用ack
帶ack的callback:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
#但是,有時RabbitMQ重啟了,訊息也會遺失。可在建立佇列時設定持久化:
(佇列的性質一旦確定無法改變)
channel.queue_declare(queue='task_queue', durable=True)
同時在傳送訊息時也得設定該訊息的持久化屬性:
channel .basic_publish(exchange='',
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存儲操作。如果還需要更完善的保證,需要使用publisher confirm。某些進程則會一直執行繁 重任務。 ##
channel.basic_qos(prefetch_count=1)告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配訊息給它。給一個工作進程,然後完成,有時想把一條訊息同時發送給多個進程:exchange發送者是不是直接發送訊息到佇列中的,事實上發生者根本不知道訊息會傳送到那個佇列,發送者只能把訊息送到exchange裡。時它需要做什麼,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。訊息時,exchange的值為'' 即使用default exchange。 綁定exchange 和佇列
channel.exchange_declare(exchange='logs', type='fanout') # 该exchange会把消息发送给所有它知道的队列中
路由
將對應routing key的訊息傳送到綁定相同routing key的佇列中
result = channel.queue_declare() # 创建一个随机队列 result = channel.queue_declare(exclusive=True) # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它 queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue='hello')
接受函數中綁定對應severity的:
channel.basic_publish(exchange='logs', routing_key='', body=message)
之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange ,例如:
"stock.usd.nyse" "nyse.vmw"
和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:
* 代表1个单词 # 代表0个或多个单词
如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。
Q1: *.orange.* 对应的是中间的colour都为orange的 Q2: *.*.rabbit 对应的是最后部分的species为rabbit的 lazy.# 对应的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。
在远程机器上运行一个函数然后获得结果。
1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复
注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 发出调用 while self.response is None: # 这边就相当于阻塞了 self.connection.process_data_events() # 查看回调队列 return int(self.response)
3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复
channel.basic_consume(on_request, queue='rpc_queue') # 绑定 等待请求 # 处理之后: ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # 发送回复到回调队列 ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作
if self.corr_id == props.correlation_id: self.response = body
以上是RabbitMQ快速入門python教程的詳細內容。更多資訊請關注PHP中文網其他相關文章!