HelloWorld
簡介
RabbitMQ:接受訊息再傳遞訊息,可以視為一個「郵局」。發送者和接受者透過佇列來進行交互,佇列的大小可以視為無限的,多個發送者可以發生給一個佇列,多個接收者也可以從一個佇列中接受訊息。
code
rabbitmq使用的協定是amqp,用於python的推薦客戶端是pika
pip install pika -i https://pypi.douban.com/simple/
send.py
# coding: utf8 import pika # 建立一个连接 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 连接本地的RabbitMQ服务器 channel = connection.channel() # 获得channel
這裡連結的是本機的,如果想要連接其他機器上的伺服器,只要填入位址或主機名稱即可。
接下來我們開始發送訊息了,注意要確保接受訊息的佇列是存在的,否則rabbitmq就丟棄掉該訊息
channel.queue_declare(queue='hello') # 在RabbitMQ中创建hello这个队列 channel.basic_publish(exchange='', # 使用默认的exchange来发送消息到队列 routing_key='hello', # 发送到该队列 hello 中 body='Hello World!') # 消息内容 connection.close() # 关闭 同时flush
RabbitMQ預設需要1GB的空閒磁碟空間,否則發送會失敗。
這時已在本機佇列hello中存放了一個訊息,如果使用rabbitmqctl list_queues 可看到
hello 1
說明有一個hello佇列裡面存放了一個訊息
receive .py
# coding: utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
還是先連結到伺服器,和之前發送時相同
channel.queue_declare(queue='hello') # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错 def callback(ch, method, properties, body): # 用于接收到消息后的回调 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', # 收指定队列hello的消息 no_ack=True) #在处理完消息后不发送ack给服务器 channel.start_consuming() # 启动消息接受 这会进入一个死循环
工作佇列(任務佇列)
工作佇列是用來分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是在安排這些任務之後執行。例如我們把task當作message送到佇列裡,啟動工作進程來接受並最終執行,且可啟動多個工作進程來運作。這適用於web應用,即不應在一個http請求的處理視窗內完成複雜任務。
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
分配訊息的方式為 輪詢 即每個工作進程獲得相同的訊息數。
訊息ack
如果訊息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該訊息就遺失了,因為rabbitmq一旦把一個訊息分發給工作進程,它就把該訊息刪掉了。
為了預防訊息遺失,rabbitmq提供了ack,即工作進程在收到訊息並處理後,發送ack給rabbitmq,告知rabbitmq這時候可以把該訊息從佇列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那麼會把該訊息 重新分發給其他工作進程。不需要設定timeout,即使該任務需要很長時間也可以處理。
ack預設是開啟的,之前我們的工作進程顯示指定了no_ack=True
channel.basic_consume(callback, queue='hello') # 会启用ack
帶ack的callback:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
訊息持久化
#但是,有時RabbitMQ重啟了,訊息也會遺失。可在建立佇列時設定持久化:
(佇列的性質一旦確定無法改變)
channel.queue_declare(queue='task_queue', durable=True)
同時在傳送訊息時也得設定該訊息的持久化屬性:
channel .basic_publish(exchange='',
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存儲操作。如果還需要更完善的保證,需要使用publisher confirm。某些進程則會一直執行繁 重任務。 ##
channel.basic_qos(prefetch_count=1)告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配訊息給它。給一個工作進程,然後完成,有時想把一條訊息同時發送給多個進程:exchange發送者是不是直接發送訊息到佇列中的,事實上發生者根本不知道訊息會傳送到那個佇列,發送者只能把訊息送到exchange裡。時它需要做什麼,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。訊息時,exchange的值為'' 即使用default exchange。 綁定exchange 和佇列
channel.exchange_declare(exchange='logs', type='fanout') # 该exchange会把消息发送给所有它知道的队列中
logs在傳送訊息時也寄一份給hello。 路由
之前已經使用過bind,即建立exchange和queue的關係(此佇列對來自該exchange的訊息有興趣),bind時可另外指定routing_key選項。使用direct exchange
將對應routing key的訊息傳送到綁定相同routing key的佇列中
result = channel.queue_declare() # 创建一个随机队列 result = channel.queue_declare(exclusive=True) # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它 queue_name = result.method.queue
傳送函數,發布不同severity的訊息:
channel.queue_bind(exchange='logs', queue='hello')
接受函數中綁定對應severity的:
channel.basic_publish(exchange='logs', routing_key='', body=message)
使用topic exchange
之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange ,例如:
"stock.usd.nyse" "nyse.vmw"
和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:
* 代表1个单词 # 代表0个或多个单词
如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。
Q1: *.orange.* 对应的是中间的colour都为orange的 Q2: *.*.rabbit 对应的是最后部分的species为rabbit的 lazy.# 对应的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。
RPC
在远程机器上运行一个函数然后获得结果。
1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复
注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 发出调用 while self.response is None: # 这边就相当于阻塞了 self.connection.process_data_events() # 查看回调队列 return int(self.response)
3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复
channel.basic_consume(on_request, queue='rpc_queue') # 绑定 等待请求 # 处理之后: ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # 发送回复到回调队列 ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作
if self.corr_id == props.correlation_id: self.response = body
以上是RabbitMQ快速入門python教程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。Python以简洁和强大的生态系统著称,C 则以高性能和底层控制能力闻名。

2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

兩小時內可以學到Python的基礎知識。 1.學習變量和數據類型,2.掌握控制結構如if語句和循環,3.了解函數的定義和使用。這些將幫助你開始編寫簡單的Python程序。

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

Python3.6環境下加載Pickle文件報錯:ModuleNotFoundError:Nomodulenamed...

如何解決jieba分詞在景區評論分析中的問題?當我們在進行景區評論分析時,往往會使用jieba分詞工具來處理文�...


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

DVWA
Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中

SublimeText3漢化版
中文版,非常好用

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

EditPlus 中文破解版
體積小,語法高亮,不支援程式碼提示功能