前の記事では、rabbitmq のインストールと古典的な hello world! を紹介しました。例。ここではワークキューについて理解します。前回の記事の続きとなるため、前回の記事を読んでいない方にはわかりにくいかもしれません。前回の記事のアドレスは、ubuntuにrabbitmqとpythonをインストールする方法です
メッセージはタスクとしても理解できます、メッセージ送信者はタスクアロケーターとして理解でき、メッセージ受信者はワーカーとして理解できますワーカーが受信したとき タスクが完了していない場合、タスク アロケーターは別のタスクを送信し、ビジー状態になるため、これらのタスクを一緒に処理するには複数のワーカーが必要になります。これらのワーカーはワーク キューと呼ばれます。構造図は以下の通りです:
rabbitmqのPythonインスタンスワークキュー
Preparation(準備)
サンプルプログラムでは、new_task.pyを使用してタスクアロケータとworker.pyをシミュレートして、労働者をシミュレートします。
send.pyを修正し、コマンドラインパラメータから情報を受信して送信
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
receive.pyのコールバック関数を修正。
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
まずここで 2 つのターミナルを開き、どちらも worker.py を実行し、listen 状態になります。これは 2 つのワーカーに相当します。 3 番目のターミナルを開いて new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
を実行し、worker.py がタスクを受け取ることを確認します。1 つのワーカーが 3 つのタスクを受け取る:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
もう 1 つのワーカーが 2 つのタスクを受け取る:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
上から見て、各ワーカーにタスクが割り当てられます。振り向く。したがって、タスクの処理中にワーカーが死亡した場合、タスクは完了しないため、他のワーカーに引き継ぐ必要があります。したがって、ワーカーがタスクを完了したときにフィードバックを提供するメカニズムが必要です。
メッセージ確認
メッセージ確認とは、ワーカーがタスクを完了すると、それが Rabbitmq にフィードバックされることを意味します。 worker.py のコールバック関数を変更します:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
Ctrl+C で終了しやすくするために、ここで 5 秒間一時停止します。
no_ack=True パラメーターを削除するか、False に設定することもできます。
channel.basic_consume(callback, queue='hello', no_ack=False)
このコードで実行すると、ワーカーの1つがctrl+cで終了しても、実行中のタスクは失われず、rabbitmqがタスクを他のワーカーに再分配します。
メッセージ耐久性 (メッセージ耐久性)
メッセージフィードバック機構はありますが、rabbitmq自体がハングアップするとタスクは失われます。したがって、タスクは永続的に保存する必要があります。永続ストレージを宣言します:
channel.queue_declare(queue='hello', durable=True)
しかし、Hello キューはすでに存在しており、RabbitMQ では既存のキューを再定義するための別のパラメータの使用が許可されていないため、このプログラムはエラーを実行します。キューを再定義します:
channel.queue_declare(queue='task_queue', durable=True)
タスクを送信するとき、delivery_mode=2 を使用してタスクを永続ストレージとしてマークします:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))
Fair Dispatch (Fair Dispatch)
上記の例、各作業者には順番にタスクが割り当てられますが、各タスクは必ずしも同じであるとは限りません。一部のタスクはより重く、実行に時間がかかる場合がありますが、一部のタスクはより軽く、実行にかかる時間が短い場合があります。公平にスケジュールできれば最善ですが、rabbitmq が同時に複数のタスクをワーカーに割り当てないように、basic_qos を使用して prefetch_count=1 を設定します。つまり、ワーカーがタスクを完了した後でのみタスクを再度受け取るようになります。 。
channel.basic_qos(prefetch_count=1)
new_task.py 完全なコード
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py 完全なコード
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
上記は Python Rabbitmq の使用方法 (2) の内容です。さらに関連する内容については、PHP 中国語 Web サイト (www.php.ん)!