Rabbitmq の中国語訳は、主にメッセージキューを意味する mq: Message Queue という文字を指します。ウサギを意味する「rabbit」という単語も付いています。これはニシキヘビという言語と同じです。外国人はとてもユーモラスです。 Rabbitmq サービスは mysql サービスや Apache サービスに似ていますが、提供される機能が異なります。 rabbimq は、異なるアプリケーション間の通信に使用できるメッセージ送信サービスを提供するために使用されます。
rabbitmq をインストールします
まず、ubuntu 12.04 では、apt-get を通じて直接インストールできます:
sudo apt-get install rabbitmq-server
インストール後、rabbitmq サービスが開始されています。次に、Python で Hello World! を記述する例を見てみましょう。サンプルの内容は、send.pyからrabbitmqに「Hello World!」を送信し、send.pyがrabbitmqから送信した情報をreceive.pyが受け取るというものです。
ここで、P は生産者を意味するプロデュースを表し、送信者とも呼ばれ、例では send.py として示されています。C は消費者を意味し、消費者とも呼ばれます。これは受信者であり、例ではreceive.pyとして示されています。中央の赤いものはキューを意味し、この例ではhello queueです。
Python は、rabbitmq サービスを使用します。ここでは、pika、txAMQP、または py-amqplib を使用します。
pikaをインストールする
pikaをインストールするには、pipを使用できます。pipがインストールされていない場合は、apt-get
sudo apt-get install python-pip
を介してインストールできます。 pip:
sudo pip install pika
send.py code
はローカルでテストされているため、localhost を使用します。
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
メッセージが配信されるメッセージキューを宣言します。存在しないキューにメッセージが送信された場合、rabbitmq はこれらのメッセージを自動的にクリアします。
channel.queue_declare(queue='hello')
上で宣言された hello キューにメッセージを送信します。ここで、exchange は、メッセージがどのキューに送信されるかを正確に指定できるエクスチェンジャーを表します。routing_key はキューの名前に設定され、body はキューの名前に設定されます。具体的な送信内容は一時的には関係ありません。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
接続を閉じます
connection.close()
完全なコード
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
まずこのプログラムを実行します。実行が成功すると、rabbitmqctlはhelloキューを正常に追加し、helloキューが1つ追加されるはずです。キュー内の情報を表示するには、rabbitmqctl コマンドを使用します。
rabbitmqctl list_queues
次の情報が作成者のコンピュータに出力されます:
確かに hello キューがあり、メッセージがあります待ち行列。次に、receive.py を使用してキュー内の情報を取得します。
receive.py コード
は、最初にサーバーに接続し、次にメッセージキューを宣言する必要がある send.py の前の 2 つのステップと同じです。同じコードはここには掲載されません。
メッセージの受信は、それを処理するためのコールバック関数を定義する必要があります。ここでのコールバック関数は、情報を出力することです。
def callback(ch, method, properties, body): print "Received %r" % (body,)
は、rabbitmqにコールバックを使用して情報を受信するように指示します
channel.basic_consume(callback, queue='hello', no_ack=True)
は、キューに情報がある場合にのみ、コールバックが呼び出されて処理されます。 Ctrl+C を押して終了します。
channel.start_consuming()
完全なコード
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
プログラムを実行すると、キュー hello に Hello World! というメッセージが表示され、画面に表示されます。端末を変更して再度send.pyを実行すると、receive.pyが再度情報を受信することがわかります。
ワークキューの例
1.準備
サンプルプログラムでは、new_task.pyを使用してタスクアロケーターをシミュレートし、worker.pyを使用してワーカーをシミュレートします。
コマンドラインパラメータから情報を受信して送信するようにsend.pyを変更します
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
receive.pyのコールバック関数を変更します。
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
まずここで 2 つのターミナルを開き、どちらも worker.py を実行し、listen 状態にします。これは 2 つのワーカーに相当します。 3 番目のターミナルを開き、new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
worker.py が 3 つのタスクを受信することを確認します:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
もう 1 つのワーカーは 2 つのミッションを受信します:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ環境の展開とワークキューの実装相关文章请关注PHP中文网!

Pythonは、データサイエンス、Web開発、自動化タスクに適していますが、Cはシステムプログラミング、ゲーム開発、組み込みシステムに適しています。 Pythonは、そのシンプルさと強力なエコシステムで知られていますが、Cは高性能および基礎となる制御機能で知られています。

2時間以内にPythonの基本的なプログラミングの概念とスキルを学ぶことができます。 1.変数とデータ型、2。マスターコントロールフロー(条件付きステートメントとループ)、3。機能の定義と使用を理解する4。

Pythonは、Web開発、データサイエンス、機械学習、自動化、スクリプトの分野で広く使用されています。 1)Web開発では、DjangoおよびFlask Frameworksが開発プロセスを簡素化します。 2)データサイエンスと機械学習の分野では、Numpy、Pandas、Scikit-Learn、Tensorflowライブラリが強力なサポートを提供します。 3)自動化とスクリプトの観点から、Pythonは自動テストやシステム管理などのタスクに適しています。

2時間以内にPythonの基本を学ぶことができます。 1。変数とデータ型を学習します。2。ステートメントやループの場合などのマスター制御構造、3。関数の定義と使用を理解します。これらは、簡単なPythonプログラムの作成を開始するのに役立ちます。

10時間以内にコンピューター初心者プログラミングの基本を教える方法は?コンピューター初心者にプログラミングの知識を教えるのに10時間しかない場合、何を教えることを選びますか...

fiddlereveryversings for the-middleの測定値を使用するときに検出されないようにする方法

Python 3.6のピクルスファイルのロードレポートエラー:modulenotFounderror:nomodulenamed ...

風光明媚なスポットコメント分析におけるJieba Wordセグメンテーションの問題を解決する方法は?風光明媚なスポットコメントと分析を行っているとき、私たちはしばしばJieba Wordセグメンテーションツールを使用してテキストを処理します...


ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SecLists
SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

PhpStorm Mac バージョン
最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール

ZendStudio 13.5.1 Mac
強力な PHP 統合開発環境

SublimeText3 Linux 新バージョン
SublimeText3 Linux 最新バージョン
