ホームページ >バックエンド開発 >Python チュートリアル >Python は RabbitMQ サーバーを操作してメッセージ キュー ルーティングを実装します

Python は RabbitMQ サーバーを操作してメッセージ キュー ルーティングを実装します

高洛峰
高洛峰オリジナル
2017-03-01 14:03:111307ブラウズ

RabbitMQ はメッセージ キュー サーバーです。ここでは、Python+Pika+RabbitMQ のサーバー側環境を見て、Python を使用して RabbitMQ サーバーを操作してメッセージ キューのルーティング機能を実装する方法を見ていきます

Python は Pika ライブラリを使用します。 (インストール: sudo pip install pika) RabbitMQ メッセージ キュー サーバーを操作できます (インストール: sudo apt-get install Rabbitmq-server) ここでは、MQ 関連のルーティング機能を見ていきます。

ルーティングキーの実装

たとえば、メッセージをすべての受信者に送信する必要があるシナリオがありますが、自由にカスタマイズする必要がある場合、一部のメッセージは一部の受信者に送信され、一部のメッセージは次の宛先に送信されます。他の受信機はどうすればいいでしょうか?この場合、ルーティング キーが使用されます。

ルーティング キーの動作原理: 各受信側のメッセージ キューがスイッチにバインドされている場合、対応するルーティング キーを設定できます。送信者がスイッチを介して情報を送信する場合、ルーティング キーを指定でき、スイッチはルーティング キーに基づいてメッセージを対応するメッセージ キューに送信し、受信側がメッセージを受信できるようにします。

前の記事に引き続き、send.py と accept.py を使用してルーティング キーの機能をシミュレートします。 send.py は送信側を表し、receive.py は受信側を表します。インスタンスの機能は、情報、警告、エラーの 3 つのレベルの情報をさまざまな受信側に送信することです。

send.py コード分析

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange='messages', type='direct')
 
#定义三个路由键
routings = ['info', 'warning', 'error']
 
#将消息依次发送到交换机,并设置路由键
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

receive.py コード分析

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange='messages', type='direct')
 
#从命令行获取路由键参数,如果没有,则设置为info
routings = sys.argv[1:]
if not routings:
  routings = ['info']
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

2 つのターミナルを開き、1 つはコード python accept.py info warning を実行します。これは、情報と警告メッセージのみが表示されることを意味します。受け取った。別の端末で send.py を実行すると、受信端末が情報と警告メッセージのみを受信することがわかります。複数のターミナルを開いてreceive.pyを実行し、異なるルーティングキーパラメータを渡すと、より明らかな効果が見られます。

受信側が実行されている場合、rabbitmqctl list_bindings を使用してバインディング ステータスを表示できます。

ルーティング キーのファジー マッチング
ルーティング キーのファジー マッチングとは、一般的に使用される正規表現とは異なる正規表現を使用できることを意味します。ここで、「#」はすべてを意味し、「*」は 1 つの単語のみに一致します。例を読めば理解できます。

上記の例に従って、ルーティング キーのファジー マッチング機能を実装するために、引き続き send.py と accept.py を使用します。 send.py は送信側を表し、receive.py は受信側を表します。この例の機能は大まかに次のとおりです。たとえば、親しい友人がいる場合、幸せ、悲しみ、仕事、人生について話すことができます。また、楽しいことを共有できる友人もいます。幸せなことを共有できる友達がいます。不幸なことについて彼女に話すことができます。

send.py コード分析

ルーティング キーのあいまい一致が必要なため、スイッチ タイプをトピックに設定する必要があります。トピックに設定されている場合は、#、* の一致記号を使用できます。

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange='messages', type='topic')
 
#定义路由键
routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']
 
#将消息依次发送到交换机,并设定路由键
for routing in routings:
  message = '%s message.' % routing
  channel.basic_publish(exchange='messages',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

上記の例では、4 種類のメッセージが定義されているため、説明は省略します。

receive.py コード分析

同様に、スイッチのタイプをトピックに設定する必要があります。コマンドラインからパラメータを受け取る機能が若干調整されました。つまり、パラメータがない場合はエラーで終了します。

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange='messages', type='topic')
 
#从命令行获取路由参数,如果没有,则报错退出
routings = sys.argv[1:]
if not routings:
  print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],)
  exit()
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange='messages',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

4 つのターミナルを開き、1 つは次のように実行され、彼女と何でも話すことができることを示します。

python receive.py "#"

もう 1 つのターミナルは次のように実行され、彼女と幸せなことを共有できることを示します。

python receive.py "happy.*"

3 番目のコマンドは次のように実行され、仕事の事柄を彼女と共有できることを示しています。

python receive.py "*.work"

最後のコマンドは python send.py を実行します。結果は容易に想像できるので、ここでは書きません。

RabbitMQ サーバーを操作してメッセージ キュー ルーティングを実装する Python に関するその他の記事については、PHP 中国語 Web サイトに注目してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。