検索
ホームページバックエンド開発Python チュートリアルPython+Pika+RabbitMQ環境の展開とワークキューの実装

Rabbitmq の中国語訳は、主にメッセージキューを意味する mq: Message Queue という文字を指します。ウサギを意味する「rabbit」という単語も付いています。これはニシキヘビという言語と同じです。外国人はとてもユーモラスです。 Rabbitmq サービスは mysql サービスや Apache サービスに似ていますが、提供される機能が異なります。 rabbimq は、異なるアプリケーション間の通信に使用できるメッセージ送信サービスを提供するために使用されます。

rabbitmq をインストールします
まず、ubuntu 12.04 では、apt-get を通じて直接インストールできます:

sudo apt-get install rabbitmq-server

インストール後、rabbitmq サービスが開始されています。次に、Python で Hello World! を記述する例を見てみましょう。サンプルの内容は、send.pyからrabbitmqに「Hello World!」を送信し、send.pyがrabbitmqから送信した情報をreceive.pyが受け取るというものです。

Python+Pika+RabbitMQ環境の展開とワークキューの実装

ここで、P は生産者を意味するプロデュースを表し、送信者とも呼ばれ、例では send.py として示されています。C は消費者を意味し、消費者とも呼ばれます。これは受信者であり、例ではreceive.pyとして示されています。中央の赤いものはキューを意味し、この例ではhello queueです。

Python は、rabbitmq サービスを使用します。ここでは、pika、txAMQP、または py-amqplib を使用します。

pikaをインストールする

pikaをインストールするには、pipを使用できます。pipがインストールされていない場合は、apt-get

sudo apt-get install python-pip

を介してインストールできます。 pip:

sudo pip install pika

send.py code

はローカルでテストされているため、localhost を使用します。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

メッセージが配信されるメッセージキューを宣言します。存在しないキューにメッセージが送信された場合、rabbitmq はこれらのメッセージを自動的にクリアします。

channel.queue_declare(queue='hello')

上で宣言された hello キューにメッセージを送信します。ここで、exchange は、メッセージがどのキューに送信されるかを正確に指定できるエクスチェンジャーを表します。routing_key はキューの名前に設定され、body はキューの名前に設定されます。具体的な送信内容は一時的には関係ありません。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

接続を閉じます

connection.close()

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

まずこのプログラムを実行します。実行が成功すると、rabbitmqctlはhelloキューを正常に追加し、helloキューが1つ追加されるはずです。キュー内の情報を表示するには、rabbitmqctl コマンドを使用します。

rabbitmqctl list_queues

次の情報が作成者のコンピュータに出力されます:

Python+Pika+RabbitMQ環境の展開とワークキューの実装


確かに hello キューがあり、メッセージがあります待ち行列。次に、receive.py を使用してキュー内の情報を取得します。

receive.py コード

は、最初にサーバーに接続し、次にメッセージキューを宣言する必要がある send.py の前の 2 つのステップと同じです。同じコードはここには掲載されません。

メッセージの受信は、それを処理するためのコールバック関数を定義する必要があります。ここでのコールバック関数は、情報を出力することです。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

は、rabbitmqにコールバックを使用して情報を受信するように指示します

channel.basic_consume(callback, queue='hello', no_ack=True)

は、キューに情報がある場合にのみ、コールバックが呼び出されて処理されます。 Ctrl+C を押して終了します。

channel.start_consuming()

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

プログラムを実行すると、キュー hello に Hello World! というメッセージが表示され、画面に表示されます。端末を変更して再度send.pyを実行すると、receive.pyが再度情報を受信することがわかります。

ワークキューの例

1.準備

サンプルプログラムでは、new_task.pyを使用してタスクアロケーターをシミュレートし、worker.pyを使用してワーカーをシミュレートします。

コマンドラインパラメータから情報を受信して​​送信するようにsend.pyを変更します

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

receive.pyのコールバック関数を変更します。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

まずここで 2 つのターミナルを開き、どちらも worker.py を実行し、listen 状態にします。これは 2 つのワーカーに相当します。 3 番目のターミナルを開き、new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

worker.py が 3 つのタスクを受信することを確認します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

もう 1 つのワーカーは 2 つのミッションを受信します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()


更多Python+Pika+RabbitMQ環境の展開とワークキューの実装相关文章请关注PHP中文网!

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
Python vs. C:比較されたアプリケーションとユースケースPython vs. C:比較されたアプリケーションとユースケースApr 12, 2025 am 12:01 AM

Pythonは、データサイエンス、Web開発、自動化タスクに適していますが、Cはシステムプログラミング、ゲーム開発、組み込みシステムに適しています。 Pythonは、そのシンプルさと強力なエコシステムで知られていますが、Cは高性能および基礎となる制御機能で知られています。

2時間のPython計画:現実的なアプローチ2時間のPython計画:現実的なアプローチApr 11, 2025 am 12:04 AM

2時間以内にPythonの基本的なプログラミングの概念とスキルを学ぶことができます。 1.変数とデータ型、2。マスターコントロールフロー(条件付きステートメントとループ)、3。機能の定義と使用を理解する4。

Python:主要なアプリケーションの調査Python:主要なアプリケーションの調査Apr 10, 2025 am 09:41 AM

Pythonは、Web開発、データサイエンス、機械学習、自動化、スクリプトの分野で広く使用されています。 1)Web開発では、DjangoおよびFlask Frameworksが開発プロセスを簡素化します。 2)データサイエンスと機械学習の分野では、Numpy、Pandas、Scikit-Learn、Tensorflowライブラリが強力なサポートを提供します。 3)自動化とスクリプトの観点から、Pythonは自動テストやシステム管理などのタスクに適しています。

2時間でどのくらいのPythonを学ぶことができますか?2時間でどのくらいのPythonを学ぶことができますか?Apr 09, 2025 pm 04:33 PM

2時間以内にPythonの基本を学ぶことができます。 1。変数とデータ型を学習します。2。ステートメントやループの場合などのマスター制御構造、3。関数の定義と使用を理解します。これらは、簡単なPythonプログラムの作成を開始するのに役立ちます。

プロジェクトの基本と問題駆動型の方法で10時間以内にコンピューター初心者プログラミングの基本を教える方法は?プロジェクトの基本と問題駆動型の方法で10時間以内にコンピューター初心者プログラミングの基本を教える方法は?Apr 02, 2025 am 07:18 AM

10時間以内にコンピューター初心者プログラミングの基本を教える方法は?コンピューター初心者にプログラミングの知識を教えるのに10時間しかない場合、何を教えることを選びますか...

中間の読書にどこでもfiddlerを使用するときにブラウザによって検出されないようにするにはどうすればよいですか?中間の読書にどこでもfiddlerを使用するときにブラウザによって検出されないようにするにはどうすればよいですか?Apr 02, 2025 am 07:15 AM

fiddlereveryversings for the-middleの測定値を使用するときに検出されないようにする方法

Python 3.6にピクルスファイルをロードするときに「__Builtin__」モジュールが見つからない場合はどうすればよいですか?Python 3.6にピクルスファイルをロードするときに「__Builtin__」モジュールが見つからない場合はどうすればよいですか?Apr 02, 2025 am 07:12 AM

Python 3.6のピクルスファイルのロードレポートエラー:modulenotFounderror:nomodulenamed ...

風光明媚なスポットコメント分析におけるJieba Wordセグメンテーションの精度を改善する方法は?風光明媚なスポットコメント分析におけるJieba Wordセグメンテーションの精度を改善する方法は?Apr 02, 2025 am 07:09 AM

風光明媚なスポットコメント分析におけるJieba Wordセグメンテーションの問題を解決する方法は?風光明媚なスポットコメントと分析を行っているとき、私たちはしばしばJieba Wordセグメンテーションツールを使用してテキストを処理します...

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SecLists

SecLists

SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

PhpStorm Mac バージョン

PhpStorm Mac バージョン

最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

SublimeText3 Linux 新バージョン

SublimeText3 Linux 新バージョン

SublimeText3 Linux 最新バージョン