搜尋
首頁後端開發Python教學RabbitMQ快速入門python教程

RabbitMQ快速入門python教程

Mar 09, 2017 am 09:28 AM
pikapythonrabbitmq

HelloWorld

簡介

RabbitMQ:接受訊息再傳遞訊息,可以視為一個「郵局」。發送者和接受者透過佇列來進行交互,佇列的大小可以視為無限的,多個發送者可以發生給一個佇列,多個接收者也可以從一個佇列中接受訊息。

code

rabbitmq使用的協定是amqp,用於python的推薦客戶端是pika

pip install pika -i https://pypi.douban.com/simple/

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel

這裡連結的是本機的,如果想要連接其他機器上的伺服器,只要填入位址或主機名稱即可。

接下來我們開始發送訊息了,注意要確保接受訊息的佇列是存在的,否則rabbitmq就丟棄掉該訊息

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush

RabbitMQ預設需要1GB的空閒磁碟空間,否則發送會失敗。

這時已在本機佇列hello中存放了一個訊息,如果使用rabbitmqctl list_queues 可看到

hello 1

說明有一個hello佇列裡面存放了一個訊息

receive .py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

還是先連結到伺服器,和之前發送時相同

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环

工作佇列(任務佇列)

工作佇列是用來分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是在安排這些任務之後執行。例如我們把task當作message送到佇列裡,啟動工作進程來接受並最終執行,且可啟動多個工作進程來運作。這適用於web應用,即不應在一個http請求的處理視窗內完成複雜任務。

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))

分配訊息的方式為 輪詢 即每個工作進程獲得相同的訊息數。

訊息ack

如果訊息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該訊息就遺失了,因為rabbitmq一旦把一個訊息分發給工作進程,它就把該訊息刪掉了。

為了預防訊息遺失,rabbitmq提供了ack,即工作進程在收到訊息並處理後,發送ack給rabbitmq,告知rabbitmq這時候可以把該訊息從佇列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那麼會把該訊息 重新分發給其他工作進程。不需要設定timeout,即使該任務需要很長時間也可以處理。

ack預設是開啟的,之前我們的工作進程顯示指定了no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack

帶ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

訊息持久化

#但是,有時RabbitMQ重啟了,訊息也會遺失。可在建立佇列時設定持久化:
(佇列的性質一旦確定無法改變)

channel.queue_declare(queue='task_queue', durable=True)

同時在傳送訊息時也得設定該訊息的持久化屬性:

channel .basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))

但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存儲操作。如果還需要更完善的保證,需要使用publisher confirm。某些進程則會一直執行繁 重任務。 ##

channel.basic_qos(prefetch_count=1)
告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配訊息給它。給一個工作進程,然後完成,有時想把一條訊息同時發送給多個進程:

exchange

發送者是不是直接發送訊息到佇列中的,事實上發生者根本不知道訊息會傳送到那個佇列,發送者只能把訊息送到exchange裡。時它需要做什麼,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。訊息時,exchange的值為'' 即使用default exchange。

綁定exchange 和佇列

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中

logs在傳送訊息時也寄一份給hello。

路由

之前已經使用過bind,即建立exchange和queue的關係(此佇列對來自該exchange的訊息有興趣),bind時可另外指定routing_key選項。使用direct exchange

將對應routing key的訊息傳送到綁定相同routing key的佇列中

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue

傳送函數,發布不同severity的訊息:

channel.queue_bind(exchange='logs',
               queue='hello')

接受函數中綁定對應severity的:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)

使用topic exchange

之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange ,例如:

"stock.usd.nyse" "nyse.vmw"

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body

                                               

以上是RabbitMQ快速入門python教程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
Python vs.C:申請和用例Python vs.C:申請和用例Apr 12, 2025 am 12:01 AM

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。Python以简洁和强大的生态系统著称,C 则以高性能和底层控制能力闻名。

2小時的Python計劃:一種現實的方法2小時的Python計劃:一種現實的方法Apr 11, 2025 am 12:04 AM

2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

Python:探索其主要應用程序Python:探索其主要應用程序Apr 10, 2025 am 09:41 AM

Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

您可以在2小時內學到多少python?您可以在2小時內學到多少python?Apr 09, 2025 pm 04:33 PM

兩小時內可以學到Python的基礎知識。 1.學習變量和數據類型,2.掌握控制結構如if語句和循環,3.了解函數的定義和使用。這些將幫助你開始編寫簡單的Python程序。

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎?如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎?Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到?如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到?Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

Python 3.6加載Pickle文件報錯"__builtin__"模塊未找到怎麼辦?Python 3.6加載Pickle文件報錯"__builtin__"模塊未找到怎麼辦?Apr 02, 2025 am 07:12 AM

Python3.6環境下加載Pickle文件報錯:ModuleNotFoundError:Nomodulenamed...

如何提高jieba分詞在景區評論分析中的準確性?如何提高jieba分詞在景區評論分析中的準確性?Apr 02, 2025 am 07:09 AM

如何解決jieba分詞在景區評論分析中的問題?當我們在進行景區評論分析時,往往會使用jieba分詞工具來處理文�...

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
4 週前By尊渡假赌尊渡假赌尊渡假赌

熱工具

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

MantisBT

MantisBT

Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

WebStorm Mac版

WebStorm Mac版

好用的JavaScript開發工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境