首頁  >  文章  >  後端開發  >  關於Python如何操作訊息佇列(RabbitMQ)的方法教學課程

關於Python如何操作訊息佇列(RabbitMQ)的方法教學課程

黄舟
黄舟原創
2017-07-20 15:34:402094瀏覽

RabbitMQ是一個在AMQP基礎上完整的,可重複使用的企業訊息系統。他遵循Mozilla Public License開源協議。以下這篇文章主要為大家介紹了利用Python操作訊息佇列RabbitMQ的方法教程,需要的朋友可以參考下。

前言

RabbitMQ是一個在AMQP基礎上完整的,可重複使用的企業訊息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式透過讀取寫出入隊列的消息(針對應用程式的資料)來通信,而無需專用連接來連結它們。消 息傳遞指的是程式之間透過在訊息中發送資料進行通信,而不是透過直接呼叫彼此來通信,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式透過 佇列來通訊。佇列的使用除去了接收和發送應用程式同時執行的要求。

應用程式場景:

RabbitMQ無疑是目前最受歡迎的訊息佇列之一,對各種語言環境的支援也很豐富,作為一個.NET developer有必要學習和了解這項工具。訊息佇列的使用場景大概有3種:

     1、系統集成,分散式系統的設計。各種子系統透過訊息來對接,這種解決方案也逐步發展成一種架構風格,即「透過訊息傳遞的架構」。

     2、系統中的同步處理方式嚴重影響了吞吐量,例如日誌記錄。假如需要記錄系統中所有的使用者行為日誌,如果透過同步的方式記錄日誌勢必會影響系統的回應速度,當我們將日誌訊息傳送到訊息佇列,記錄日誌的子系統就會透過非同步的方式去消費日誌消息。

     3.系統的高可用性,例如電商的秒殺場景。當某一時刻應用程式伺服器或資料庫伺服器收到大量要求,將會出現系統宕機。如果能夠將請求轉送到訊息佇列,再由伺服器去消費這些訊息將會使得請求變得平穩,提高系統的可用性。

一、安裝環境

#首先是在Linux 上安裝rabbitmq


# 环境为CentOS 7
yum install rabbitmq-server # 安装RabbitMQ
systemctl start rabbitmq-server # 启动
systemctl enable rabbitmq-server # 开机自启
systemctl stop firewall-cmd  # 临时关闭防火墙

然後用pip 安裝Python3 的開發包


pip3 install pika

安裝好軟體之後可以訪問http://115.xx.xx. xx:15672/來存取自備的web 頁面來查看和管理RabbitMQ。預設管理員的使用者密碼都是guest

二、簡單的向佇列加入訊息


# #

#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
 channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
 print("sent...")
# 关闭连接
connection.close()

三、簡單的從佇列中取得訊息


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 定义一个回调函数
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

四、萬一消費者斷線了


想像這樣一種情況:


消費者從訊息佇列中取得了n 條數據,正要處理呢結果宕機了,那該怎麼辦?在 RabbieMQ 中有一個 ACK 可以用來確認消費者處理結束。就有點類似網路中的 ACK,消費者每次從佇列中取得了資料之後佇列不會立刻將資料移除,而是等待對應的 ACK。消費者取得到資料並處理完成之後會向佇列發送一個 ACK 包,通知 RabbitMQ 這堆訊息已經處理妥當了,可以刪除了,這時候 RabbitMQ 才會將資料從佇列中移除。所以這種情況即使消費者斷線也沒有什麼問題,資料依舊會在佇列中存在,留給其他消費者處理。

在Python 中這樣實作:

消費者有這樣一行程式碼channel.basic_consume(callback, queue='test_queue' , no_ack=False)
,其中

no_ack=False

表示不傳送確認包。將其修改為no_ack=True就會在每次處理完後向 RabbitMQ 發送確認包,以確認訊息已處理完畢。

五、萬一RabbitMQ 宕機了呢


雖然有了ACK 包,但萬一RabbitMQ 掛了那數據還是會損失。所以我們可以給 RabbitMQ 設定一個資料持久化儲存。 RabbitMQ 會將資料持久化儲存到磁碟上,確保下次再啟動的時候佇列還在。

在 Python 中這樣實作:#########

我们声明一个队列是这样的channel.queue_declare(queue='test_queue') ,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True) 。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))

六、最简单的发布订阅

最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。

发布者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='my_fanout', # 指定exchange
      routing_key='', # fanout下不需要配置,配置了也不会生效
      body=message)
connection.close()

订阅者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='my_fanout',
     queue=queue_name)
# 定义回调方法
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 从队列获取信息
channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
channel.start_consuming()

总结

以上是關於Python如何操作訊息佇列(RabbitMQ)的方法教學課程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn