検索
ホームページバックエンド開発Python チュートリアルRabbitMQ クイック スタート Python チュートリアル

HelloWorld

はじめに

RabbitMQ: メッセージを受信して​​配信するもので、「郵便局」とみなすことができます。送信者と受信者はキューを介して対話します。キューのサイズは無制限であると考えられます。複数の送信者がキューにメッセージを送信でき、複数の受信者がキューからメッセージを受信することもできます。

コード

rabbitmq で使用されるプロトコルは amqp で、Python の推奨クライアントは pika

pip install pika -i https://pypi.douban.com/simple/

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel

他のマシン上のサーバーに接続したい場合は、ここにあるリンクがあります。 just fill in アドレスまたはホスト名を入力するだけです。

次にメッセージの送信を開始します。メッセージを受け入れるキューが存在することを確認してください。存在しない場合、rabbitMQ はメッセージを破棄します

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush

RabbitMQ はデフォルトで 1GB の空きディスク容量を必要とし、それ以外の場合は送信が失敗します。

このとき、ローカルキューhelloにメッセージが格納されています。rabbitmqctl list_queuesを使用すると、helloキューにメッセージが格納されていることを示す

receive.py

hello 1

が表示されます。最初にサーバーに接続した方が良いです。前に送信したときと同じです

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

ワークキュー(タスクキュー)

ワークキューは、時間のかかるタスクを複数のワーカープロセスに分散するために使用されます。リソースを消費するタスクをすぐに実行するのではなく (これらのタスクが完了するまで待つ必要があります)、後で実行するようにこれらのタスクをスケジュールします。たとえば、タスクをメッセージとしてキューに送信し、ワーカー プロセスを開始してタスクを受け入れ、最終的に実行します。また、複数のワーカー プロセスを開始して動作させることもできます。これは、http リクエストの処理ウィンドウ内で複雑なタスクを完了すべきではない Web アプリケーションに当てはまります。

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环

メッセージの配布方法はポーリングです。つまり、各ワーカー プロセスは同じ数のメッセージを取得します。

メッセージ確認

メッセージがワーカー プロセスに割り当てられているが、処理が完了する前にワーカー プロセスがクラッシュした場合、メッセージは失われる可能性があります。これは、rabbitmq がメッセージをワーカー プロセスに配布すると、メッセージが削除されるためです。

メッセージの損失を防ぐために、rabbitmq は ack を提供します。つまり、ワーカー プロセスがメッセージを受信して​​処理した後、rabbitmq に ack を送信して、現時点でメッセージをキューから削除できることを Rabbitmq に通知します。ワーカー プロセスが停止し、rabbitmq が ack を受信しない場合、メッセージは他のワーカー プロセスに再配布されます。タイムアウトを設定する必要がなく、長時間かかるタスクでも処理できます。

ack はデフォルトで有効になっています。以前、ワーカー プロセスは ack:

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))

メッセージ永続性

を使用して no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack

コールバックを指定していましたが、場合によっては RabbitMQ が再起動され、メッセージが失われます。永続性はキューの作成時に設定できます:

(キューの性質は一度決定すると変更できません)

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

同時に、メッセージの送信時にメッセージの永続性属性も設定する必要があります:

channel.basic_publish (exchange='',

channel.queue_declare(queue='task_queue', durable=True)

ただし、RabbitMQ がメッセージを受信したばかりで、それを保存する時間がなかった場合でも、メッセージは失われます。同時に、RabbitMQ は受信したすべてのメッセージを保存しません。より完全な保証が必要な場合は、発行者確認を使用する必要があります。

公平なメッセージ配信

ポーリング モードでのメッセージ配信は公平ではない可能性があります。たとえば、奇数のメッセージが重いタスクである場合、一部のプロセスは常に重いタスクを実行します。たとえば、特定のワーカー プロセスにバックログのメッセージがある場合でも、多くの ack が送信されませんが、RabbitMQ は引き続きメッセージを受信プロセスに追加できます:

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))

を RabbitMQ に通知します。ワーカー プロセスが ack を返さない場合、それ以上のメッセージは割り当てられないことを示します。

グループ

通常、メッセージは複数のプロセスに送信されてから完了することがあります。同時に:

exchange

送信者はメッセージをキューに直接送信しますか? 実際、送信者はメッセージを交換に送信することしかできません。一方では、交換機はプロデューサーからメッセージを受信し、他方ではメッセージをキューにプッシュします。メッセージを受信したときに何をする必要があるか、メッセージを特別なメッセージに追加する必要があるかどうかを知る必要があります。 Exchange には、直接、トピック、ヘッダー、ファンアウトなどのタイプがあり、メッセージを発行するときに使用されるのはファンアウトです。これは、Exchange の値を意味します。デフォルトの Exchange を使用します

channel.basic_qos(prefetch_count=1)

一時キュー

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
このように、result.method.queue はキュー名であり、

Exchange と queue

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
のログをバインドしてコピーを送信します。メッセージを送信するときは、こんにちは

メッセージを送信するときは、新しく作成されたログ交換を使用します

channel.queue_bind(exchange='logs',
               queue='hello')
ルーティング

以前はバインドを使用して、交換とキューの関係を確立しました(キューは交換からのメッセージに関心があります) )、バインドするときに routing_key オプションを指定することもできます

直接交換

を使用して、ルーティング キーに対応するメッセージを同じルーティング キーにバインドされたキューに送信します

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)

。異なる重大度のメッセージをパブリッシュする送信関数:

channel.exchange_declare(exchange='direct_logs',
                     type='direct')
受信関数で対応する重大度をバインドします:

channel.basic_publish(exchange='direct_logs',
                  routing_key=severity,
                  body=message)
トピック交換を使用します

前に使用した直接交換は 1 つのルーティング キーのみをバインドできます。これを使用して、ルーティング キーのトピック交換を開くことができます。例:

"stock.usd.nyse" "nyse.vmw"

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body

                                               

以上がRabbitMQ クイック スタート Python チュートリアルの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
Pythonと時間:勉強時間を最大限に活用するPythonと時間:勉強時間を最大限に活用するApr 14, 2025 am 12:02 AM

限られた時間でPythonの学習効率を最大化するには、PythonのDateTime、時間、およびスケジュールモジュールを使用できます。 1. DateTimeモジュールは、学習時間を記録および計画するために使用されます。 2。時間モジュールは、勉強と休息の時間を設定するのに役立ちます。 3.スケジュールモジュールは、毎週の学習タスクを自動的に配置します。

Python:ゲーム、GUIなどPython:ゲーム、GUIなどApr 13, 2025 am 12:14 AM

PythonはゲームとGUI開発に優れています。 1)ゲーム開発は、2Dゲームの作成に適した図面、オーディオ、その他の機能を提供し、Pygameを使用します。 2)GUI開発は、TKINTERまたはPYQTを選択できます。 TKINTERはシンプルで使いやすく、PYQTは豊富な機能を備えており、専門能力開発に適しています。

Python vs. C:比較されたアプリケーションとユースケースPython vs. C:比較されたアプリケーションとユースケースApr 12, 2025 am 12:01 AM

Pythonは、データサイエンス、Web開発、自動化タスクに適していますが、Cはシステムプログラミング、ゲーム開発、組み込みシステムに適しています。 Pythonは、そのシンプルさと強力なエコシステムで知られていますが、Cは高性能および基礎となる制御機能で知られています。

2時間のPython計画:現実的なアプローチ2時間のPython計画:現実的なアプローチApr 11, 2025 am 12:04 AM

2時間以内にPythonの基本的なプログラミングの概念とスキルを学ぶことができます。 1.変数とデータ型、2。マスターコントロールフロー(条件付きステートメントとループ)、3。機能の定義と使用を理解する4。

Python:主要なアプリケーションの調査Python:主要なアプリケーションの調査Apr 10, 2025 am 09:41 AM

Pythonは、Web開発、データサイエンス、機械学習、自動化、スクリプトの分野で広く使用されています。 1)Web開発では、DjangoおよびFlask Frameworksが開発プロセスを簡素化します。 2)データサイエンスと機械学習の分野では、Numpy、Pandas、Scikit-Learn、Tensorflowライブラリが強力なサポートを提供します。 3)自動化とスクリプトの観点から、Pythonは自動テストやシステム管理などのタスクに適しています。

2時間でどのくらいのPythonを学ぶことができますか?2時間でどのくらいのPythonを学ぶことができますか?Apr 09, 2025 pm 04:33 PM

2時間以内にPythonの基本を学ぶことができます。 1。変数とデータ型を学習します。2。ステートメントやループの場合などのマスター制御構造、3。関数の定義と使用を理解します。これらは、簡単なPythonプログラムの作成を開始するのに役立ちます。

プロジェクトの基本と問題駆動型の方法で10時間以内にコンピューター初心者プログラミングの基本を教える方法は?プロジェクトの基本と問題駆動型の方法で10時間以内にコンピューター初心者プログラミングの基本を教える方法は?Apr 02, 2025 am 07:18 AM

10時間以内にコンピューター初心者プログラミングの基本を教える方法は?コンピューター初心者にプログラミングの知識を教えるのに10時間しかない場合、何を教えることを選びますか...

中間の読書にどこでもfiddlerを使用するときにブラウザによって検出されないようにするにはどうすればよいですか?中間の読書にどこでもfiddlerを使用するときにブラウザによって検出されないようにするにはどうすればよいですか?Apr 02, 2025 am 07:15 AM

fiddlereveryversings for the-middleの測定値を使用するときに検出されないようにする方法

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

SecLists

SecLists

SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

Dreamweaver Mac版

Dreamweaver Mac版

ビジュアル Web 開発ツール

PhpStorm Mac バージョン

PhpStorm Mac バージョン

最新(2018.2.1)のプロフェッショナル向けPHP統合開発ツール