译-PHP RabbitMQ チュートリアル-3

パブリッシュ/サブスクライブ (php-amqplib を使用)

前のチュートリアルでは、ワーク キューを作成しました。このパートでは、各タスクが 1 つのワーカーに配信されることを前提としています。異なります -- 複数のコンシューマーにメッセージを配信します。このパターンは「パブリッシュ/サブスクライブ」として知られています。

パターンを説明するために、単純なログ システムを構築します。これは 2 つのプログラムで構成されます。1 つ目はログ メッセージを送信し、2 つ目はログ メッセージを受信して​​出力します。

私たちのロギング システムでは、レシーバー プログラムの実行中のすべてのコピーがメッセージを取得します。これにより、1 つのレシーバーを実行してログをディスクに送信し、同時に別のレシーバーを実行できるようになります。

基本的に、公開されたログ メッセージはすべての受信者にブロードキャストされます。

チュートリアルの前の部分では、キューとの間でメッセージを送受信しました。ここで、Rabbit の完全なメッセージング モデルを紹介します。

  • プロデューサーは、メッセージを送信するユーザー アプリケーションです。
  • キューは、メッセージを格納するバッファです。
  • コンシューマーは、メッセージを受信するユーザー アプリケーションです。
  • RabbitMQ のメッセージング モデルの中心的な考え方は、プロデューサーがメッセージをキューに直接送信しないということです。実際、プロデューサーは、メッセージが配信されるかどうかさえ知りません。
    代わりに、プロデューサーはメッセージをエクスチェンジに送信することしかできません。エクスチェンジは、一方ではプロデューサーからメッセージを受信し、もう一方ではメッセージをキューにプッシュします。受信したメッセージを特定のキューに追加する必要がありますか? それとも、そのルールは交換タイプによって定義されますか?


    利用可能な交換タイプはいくつかあります: ダイレクト、トピック、ヘッダー、ファンアウトです。ここでは最後のタイプの交換を作成し、ログと呼びます。

    ファンアウトの切り替えは 2 倍簡単です。その名前から推測できるように、受信したすべてのメッセージをすべての既知のキューにブロードキャストします。これはまさに私たちのログシステムが望んでいることです。

    取引所のリスト (取引所リスト)

    サーブ上の取引所をリストするには、非常に便利な Rabbitmqctl を実行できます:


    $ sudo rabbitmqctl list_exchangesListing exchanges ...        directamq.direct      directamq.fanout      fanoutamq.headers     headersamq.match       headersamq.rabbitmq.log        topicamq.rabbitmq.trace      topicamq.topic       topiclogs    fanout...done.

    In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.


    Nameless exchange(无名氏交换器)

    In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which is identified by the empty string ("").


    Recall how we published a message before:


    $channel->basic_publish($msg, '', 'hello');

    Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish

    Now, we can publish to our named exchange instead:


    $channel->exchange_declare('logs', 'fanout', false, false, false);$channel->basic_publish($msg, 'logs');

    Temporary queues(临时队列)

    As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.


    But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

    Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.



    Secondly, once we disconnect the consumer the queue should be automatically deleted.


    In the php-amqplib client, when we supply queue name as an empty string, we create a non-durable queue with a generated name:


    list($queue_name, ,) = $channel->queue_declare("");

    When the method returns, the $queue_name variable contains a random queue name generated by RabbitMQ. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.


    We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.



    $channel->queue_bind($queue_name, 'logs');

    From now on the logs exchange will append messages to our queue.


    Listing bindings(捆绑列表)

    You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.

    Putting it all together(合体!!!又来!!哈哈哈)

    <?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->exchange_declare('logs', 'fanout', false, false, false);$data = implode(' ', array_slice($argv, 1));if(empty($data)) $data = "info: Hello World!";$msg = new AMQPMessage($data);$channel->basic_publish($msg, 'logs');echo " [x] Sent ", $data, "\n";$channel->close();$connection->close();?>

    (emit_log.php source)

    As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.


    The code for receive_logs.php:(receive_logs.php代码)

    channel();$channel->exchange_declare('logs', 'fanout', false, false, false);list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$channel->queue_bind($queue_name, 'logs');echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";$callback = function($msg){  echo ' [x] ', $msg->body, "\n";};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while(count($channel->callbacks)) {    $channel->wait();}$channel->close();$connection->close();?>

    (receive_logs.php source)

    If you want to save logs to a file, just open a console and type:


    $ php receive_logs.php > logs_from_rabbit.log

    If you wish to see the logs on your screen, spawn a new terminal and run:


    $ php receive_logs.php

    And of course, to emit logs type:


    $ php emit_log.php

    Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.rb programs running you should see something like:

    $ sudo rabbitmqctl list_bindingsListing bindings ...logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []...done.

    The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.


    要想知道怎么收听部分消息嘛,请听下回分解~~~~~~~~~~~ :)

