ホームページ  >  記事  >  バックエンド開発  >  Pythonでメッセージキュー(RabbitMQ)を操作するチュートリアル

Pythonでメッセージキュー(RabbitMQ)を操作するチュートリアル

黄舟
黄舟オリジナル
2017-07-20 15:34:402094ブラウズ

RabbitMQ は、AMQP に基づいた完全で再利用可能なエンタープライズ メッセージング システムです。これは、Mozilla Public License オープンソース契約に従っています。以下の記事ではPythonを使ってメッセージキューRabbitMQを操作する方法を中心に紹介していますので、困っている方は参考にしてみてください。

はじめに

RabbitMQ は、AMQP に基づいた完全で再利用可能なエンタープライズ メッセージング システムです。これは、Mozilla Public License オープンソース契約に従っています。
MQ はメッセージ キュー (Message Queue) の略で、アプリケーション間の通信方法です。アプリケーションは、キューとの間でメッセージ (アプリケーション固有のデータ) を読み書きすることによって通信します。キューをリンクするための専用接続は必要ありません。メッセージングとは、プログラムが相互に直接呼び出しを行うのではなく、メッセージでデータを送信することによって相互に通信することを指します。これは通常、リモート プロシージャ コールなどの技術に使用されます。キューイングとは、キューを介して通信するアプリケーションを指します。キューを使用すると、受信アプリケーションと送信アプリケーションを同時に実行する必要がなくなります。

アプリケーションシナリオ:

RabbitMQ は間違いなく現在最も人気のあるメッセージキューの 1 つであり、さまざまな言語環境を豊富にサポートしています。.NET 開発者として、このツールを学習して理解する必要があります。メッセージキューの使用シナリオは大きく分けて 3 つあります:

1. システム統合と分散システム設計。さまざまなサブシステムがメッセージを介して接続され、このソリューションは徐々に「メッセージを通過するアーキテクチャ」というアーキテクチャ スタイルに発展しました。

2. ロギングなど、システム内の同期処理方法がスループットに重大な影響を与える場合。システム内のすべてのユーザー行動ログを記録する必要がある場合、ログを同期的に記録すると、ログ メッセージをメッセージ キューに送信するときに、ログ サブシステムがログ情報を非同期的に消費することになります。

3. 電子商取引のフラッシュセールシナリオなど、システムの高可用性。アプリケーションサーバーやデータベースサーバーが一定時間に大量のリクエストを受信すると、システムダウンが発生します。リクエストをメッセージ キューに転送でき、サーバーがこれらのメッセージを消費できる場合、リクエストはよりスムーズになり、システムの可用性が向上します。

1. インストール環境

まずLinuxにrabbitmqをインストールします


# 环境为CentOS 7
yum install rabbitmq-server # 安装RabbitMQ
systemctl start rabbitmq-server # 启动
systemctl enable rabbitmq-server # 开机自启
systemctl stop firewall-cmd  # 临时关闭防火墙

次に、pipを使用してPython3開発パッケージをインストールします


pip3 install pika

ソフトウェアをインストールした後、http: / /115.xx.xx.xx:15672/ 組み込み Web ページにアクセスして、RabbitMQ を表示および管理します。デフォルトの管理者ユーザーのパスワードは、guest

2. メッセージをキューに追加するだけです


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
 channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
 print("sent...")
# 关闭连接
connection.close()

3. キューからメッセージを取得するだけです


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 定义一个回调函数
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

4.コンシューマがオフラインになったらどうなるでしょうか

次のような状況を想像してください:

コンシューマはメッセージ キューから n 個のデータを取得し、それを処理しようとしていますが、マシンがダウンしました。どうすればよいでしょうか。管理? RabbieMQ には、コンシューマ処理の終了を確認するために使用できる ACK があります。これは、ネットワークの ACK に似ています。コンシューマーがキューからデータを取得するたびに、キューはデータをすぐに削除せず、対応する ACK を待ちます。コンシューマーがデータを取得して処理を完了すると、ACK パケットをキューに送信して、メッセージが処理され削除できることを RabbitMQ に通知します。この時点で、RabbitMQ はキューからデータを削除します。したがって、この場合、コンシューマーがオフラインになっても問題はなく、データはキュー内に残り、他のコンシューマーが処理できるように残されます。

は、次のように Python で実装されます:

コンシューマには、確認パッケージを送信しないことを意味するコード行 channel.basic_consume(callback, queue='test_queue', no_ack=False) ,其中no_ack=False があります。これを no_ack=True に変更すると、各処理後に確認パケットが RabbitMQ に送信され、メッセージが処理されたことを確認します。

5. RabbitMQ がダウンしたらどうするか

ACK パケットはありますが、RabbitMQ がダウンするとデータは失われます。したがって、RabbitMQ 用のデータ永続ストレージをセットアップできます。 RabbitMQ はデータをディスク上に保持し、次回起動時にもキューが存在するようにします。

は Python で次のように実装されます:

我们声明一个队列是这样的channel.queue_declare(queue='test_queue') ,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True) 。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))

六、最简单的发布订阅

最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。

发布者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='my_fanout', # 指定exchange
      routing_key='', # fanout下不需要配置,配置了也不会生效
      body=message)
connection.close()

订阅者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='my_fanout',
     queue=queue_name)
# 定义回调方法
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 从队列获取信息
channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
channel.start_consuming()

总结

以上がPythonでメッセージキュー(RabbitMQ)を操作するチュートリアルの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。