搜尋
首頁後端開發Python教學Python+Pika+RabbitMQ環境部署及實作工作佇列

rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,也就是訊息佇列的意思。前面還有rabbit單詞,就是兔子的意思,跟python語言叫python一樣,老外還蠻幽默的。 rabbitmq服務類似mysql、apache服務,只是提供的功能不一樣。 rabbimq是用來提供發送訊息的服務,可以用在不同的應用程式之間進行通訊。

安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接透過apt-get安裝:

#
sudo apt-get install rabbitmq-server

安裝好後,rabbitmq服務就已經啟動好了。接下來看下python編寫Hello World!的實例。實例的內容就是從send.py發送「Hello World!」到rabbitmq,receive.py從rabbitmq接收send.py發送的訊息。

Python+Pika+RabbitMQ環境部署及實作工作佇列

其中P表示produce,生產者的意思,也可以稱為發送者,實例中表現為send.py;C表示consumer,消費者的意思,也可以稱為接收者,實例中表現為receive.py;中間紅色的表示佇列的意思,實例中表現為hello佇列。

python使用rabbitmq服務,可以使用現成的類別庫pika、txAMQP或py-amqplib,這裡選擇了pika。

安裝pika

安裝pika可以使用pip來進行安裝,pip是python的軟體管理包,如果沒有安裝,可以透過apt-get安裝

sudo apt-get install python-pip

透過pip安裝pika:

#
sudo pip install pika

##send.py代碼

連線到rabbitmq伺服器,因為是在本地測試,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

聲明訊息佇列,訊息將在這個佇列中傳遞。如果將訊息傳送到不存在的佇列,rabbitmq將會自動清除這些訊息。

channel.queue_declare(queue='hello')

發送訊息到上面宣告的hello佇列,其中exchange表示交換器,能精確指定訊息應該傳送到哪個佇列,routing_key設定為佇列的名稱,body就是發送的內容,具體發送細節暫時先不關注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

關閉連線

#

connection.close()

完整程式碼


Python+Pika+RabbitMQ環境部署及實作工作佇列


#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

先來執行下這個程序,執行成功的話,rabbitmqctl應該成功增加了hello隊列,並且隊列裡應該有一條信息,用rabbitmqctl命令來查看下

rabbitmqctl list_queues

在筆者的電腦上輸出以下訊息:

##確實有一個hello佇列,並且佇列裡有一個訊息。接下來用receive.py來取得佇列裡的資訊。

receive.py程式碼

和send.py的前面兩個步驟一樣,都是要先連接伺服器,然後宣告訊息的佇列,這裡就不再貼同樣代碼了。

接收訊息更為複雜一些,需要定義一個回呼函數來處理,這邊的回呼函數就是將訊息列印出來。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

告訴rabbitmq使用callback來接收訊息

channel.basic_consume(callback, queue='hello', no_ack=True)

開始接收訊息,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理。按ctrl+c退出。

channel.start_consuming()

完整程式碼

#

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

執行程序,就能夠接收到佇列hello裡的訊息Hello World!,然後印在螢幕上。換一個終端,再次執行send.py,可以看到receive.py這邊會再次接收到訊息。

工作佇列範例

1.準備工作(Preparation)

在實例程式中,用new_task .py來模擬任務分配者, worker.py來模擬工作者。

修改send.py,從命令列參數接收訊息,並發送

#
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

修改receive.py的回呼函數。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

###這邊先打開兩個終端,都執行worker.py,處於監聽狀態,這邊就相當於兩個工作者。開啟第三個終端,執行new_task.py#########
$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
##########觀察worker.py接收到任務,其中一個工作者接收到3個任務:### ######
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
#########另外一個工作者接收到2個任務:#########
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
######

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()


更多Python+Pika+RabbitMQ環境部署及實作工作佇列相关文章请关注PHP中文网!

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
Python的科學計算中如何使用陣列?Python的科學計算中如何使用陣列?Apr 25, 2025 am 12:28 AM

Arraysinpython,尤其是Vianumpy,ArecrucialInsCientificComputingfortheireftheireffertheireffertheirefferthe.1)Heasuedfornumerericalicerationalation,dataAnalysis和Machinelearning.2)Numpy'Simpy'Simpy'simplementIncressionSressirestrionsfasteroperoperoperationspasterationspasterationspasterationspasterationspasterationsthanpythonlists.3)inthanypythonlists.3)andAreseNableAblequick

您如何處理同一系統上的不同Python版本?您如何處理同一系統上的不同Python版本?Apr 25, 2025 am 12:24 AM

你可以通過使用pyenv、venv和Anaconda來管理不同的Python版本。 1)使用pyenv管理多個Python版本:安裝pyenv,設置全局和本地版本。 2)使用venv創建虛擬環境以隔離項目依賴。 3)使用Anaconda管理數據科學項目中的Python版本。 4)保留系統Python用於系統級任務。通過這些工具和策略,你可以有效地管理不同版本的Python,確保項目順利運行。

與標準Python陣列相比,使用Numpy數組的一些優點是什麼?與標準Python陣列相比,使用Numpy數組的一些優點是什麼?Apr 25, 2025 am 12:21 AM

numpyarrayshaveseveraladagesoverandastardandpythonarrays:1)基於基於duetoc的iMplation,2)2)他們的aremoremoremorymorymoremorymoremorymoremorymoremoremory,尤其是WithlargedAtasets和3)效率化,效率化,矢量化函數函數函數函數構成和穩定性構成和穩定性的操作,製造

陣列的同質性質如何影響性能?陣列的同質性質如何影響性能?Apr 25, 2025 am 12:13 AM

數組的同質性對性能的影響是雙重的:1)同質性允許編譯器優化內存訪問,提高性能;2)但限制了類型多樣性,可能導致效率低下。總之,選擇合適的數據結構至關重要。

編寫可執行python腳本的最佳實踐是什麼?編寫可執行python腳本的最佳實踐是什麼?Apr 25, 2025 am 12:11 AM

到CraftCraftExecutablePythcripts,lollow TheSebestPractices:1)Addashebangline(#!/usr/usr/bin/envpython3)tomakethescriptexecutable.2)setpermissionswithchmodwithchmod xyour_script.3)

Numpy數組與使用數組模塊創建的數組有何不同?Numpy數組與使用數組模塊創建的數組有何不同?Apr 24, 2025 pm 03:53 PM

numpyArraysareAreBetterFornumericalialoperations andmulti-demensionaldata,而learthearrayModuleSutableforbasic,內存效率段

Numpy數組的使用與使用Python中的數組模塊陣列相比如何?Numpy數組的使用與使用Python中的數組模塊陣列相比如何?Apr 24, 2025 pm 03:49 PM

numpyArraySareAreBetterForHeAvyNumericalComputing,而lelethearRayModulesiutable-usemoblemory-connerage-inderabledsswithSimpleDatateTypes.1)NumpyArsofferVerverVerverVerverVersAtility andPerformanceForlargedForlargedAtatasetSetsAtsAndAtasEndCompleXoper.2)

CTYPES模塊與Python中的數組有何關係?CTYPES模塊與Python中的數組有何關係?Apr 24, 2025 pm 03:45 PM

ctypesallowscreatingingangandmanipulatingc-stylarraysinpython.1)usectypestoInterfacewithClibrariesForperfermance.2)createc-stylec-stylec-stylarraysfornumericalcomputations.3)passarraystocfunctions foreforfunctionsforeffortions.however.however,However,HoweverofiousofmemoryManageManiverage,Pressiveo,Pressivero

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。