ホームページ >バックエンド開発 >PHPチュートリアル >译-PHP RabbitMQ チュートリアル-3
最後のステップでは、ワーク キューを確立しました。暗黙の前提として、タスクは 1 人のワーカーのみにディスパッチされます。今回はまったく異なることを行います。このパターンは「パブリッシュ/サブスクライブ」と呼ばれます。
パターンを説明するために、単純なログ システムを構築します。これは 2 つのプログラムで構成されます。1 つ目はログ メッセージを送信し、2 つ目はログ メッセージを受信して出力します。
パターンを説明するために、次のようにします。簡単なログシステムを構築してみます。 簡単なログシステムを構築してみましょう。これは 2 つのプログラムで構成されています。1 つ目はログ メッセージを発行し、2 つ目はこれらのメッセージを受信して出力します。
私たちのロギング システムでは、レシーバー プログラムの実行中のすべてのコピーがメッセージを取得します。これにより、1 つのレシーバーを実行してログをディスクに送信し、同時に別のレシーバーを実行できるようになります。
私たちのログ システムでは、受信側プログラムの各ランタイム コピーは、送信者によって送信されたメッセージを受信できます。そうすることで、1 つの受信プログラムを実行してログをハードディスクに書き込み、同時に別のプログラムを実行して画面にメッセージを表示できます。
基本的に、公開されたログ メッセージはすべての受信者にブロードキャストされます。
基本的に、公開されたログ メッセージはすべての受信者 (リスナー) にブロードキャストされます。
交換
チュートリアルの前の部分では、キューとの間でメッセージを送受信しました。キューを作成してメッセージを受信します。ここで、RabbitMQ の完全なメッセージング モデルを紹介します。
前のチュートリアルで説明した内容を簡単に確認してみましょう:
前のチュートリアルで説明した内容を簡単に確認してみましょう:
このメッセージング パターンの中心的な考え方は、プロデューサーがメッセージをキューに直接送信しないということです。実際、多くの場合、プロデューサーは、メッセージがキューに
配信されるかどうかさえ知りません。
代わりに、プロデューサーはメッセージをエクスチェンジに送信することしかできません。エクスチェンジは、一方ではプロデューサーからメッセージを受信し、もう一方ではメッセージをキューにプッシュします。受信したメッセージを特定のキューに追加する必要がありますか? それとも、そのルールは交換タイプによって定義されますか?
代わりに、プロデューサーはメッセージを送信するだけです。交換に送りました。スイッチというのはとてもシンプルなものです。一方ではプロデューサーからメッセージを受信し、他方では
メッセージをキューにプッシュします。交換側は、受信したメッセージをどう処理するかを正確に把握している必要があります。このメッセージは特定のキューに追加する必要がありますか?それとも複数のキューに追加しますか?あるいは、捨てるべきかもしれません。 これを行うには、交換タイプを通じてルールを定義できます。
利用可能な交換タイプはいくつかあります: ダイレクト、トピック、ヘッダー、ファンアウトです。ここでは最後のタイプの交換を作成し、ログと呼びます。
中央で利用可能。交換タイプ: ダイレクト、トピック、ヘッダー、およびファンアウト 最後の 1 つであるファンアウトを見てみましょう。「ログ」と呼ばれるこのタイプの交換を作成します。
ファンアウト交換は非常に簡単です。名前は、受信したすべてのメッセージを、それが知っているすべてのキューにブロードキャストするだけです。これが、まさに私たちのロガーに必要なことです。
ファンアウトの切り替えは 2 倍簡単です。その名前から推測できるように、受信したすべてのメッセージをすべての既知のキューにブロードキャストします。これはまさに私たちのログシステムが望んでいることです。
取引所のリスト (取引所リスト)
サーブ上の取引所をリストするには、非常に便利な Rabbitmqctl を実行できます:
你还是可以运行rabbitmqctl来列出服务中的交换器(还可以看到类型哦)。
$ sudo rabbitmqctl list_exchangesListing exchanges ... directamq.direct directamq.fanout fanoutamq.headers headersamq.match headersamq.rabbitmq.log topicamq.rabbitmq.trace topicamq.topic topiclogs fanout...done.
In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.
列表中有一些amq.*开头的交换器和默认(没名字内个)交换器。他们是默认创建的。但现在我们不太可能用的到。
In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which is identified by the empty string ("").
之前那几扒,我们对交换器一无所知,但仍然能给队列发送消息(为喵?)。那很有可能我们是在用默认的交换器,就是用空字符串定义的那个啦。
Recall how we published a message before:
回忆一下我们之前咋发布消息的:
$channel->basic_publish($msg, '', 'hello');
Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish
我们在这用默认的或无名氏交换器:如果有routing_key,消息就会被路由到routing_key指定的队列,routing_key就是 basic_publish的第二个参数。
Now, we can publish to our named exchange instead:
现在嘞,我们发布个带名字的交换器。
$channel->exchange_declare('logs', 'fanout', false, false, false);$channel->basic_publish($msg, 'logs');
As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.
你可能还记得,之前我们用过叫hello和task_queue的队列。会命名队列对我们很重要??我们得把worker指向相同的队列(同名字的)。当你想在生产者和消费者之间共享一个队列的时候,命名一个队列的名字就显得尤为重要。
But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.
但那不是我们日志系统要用的情景。 我们是要收听到所有的日志消息,不是一部分。同样,我们只对当前最新的消息感兴趣,而不是那些陈旧的。两件事儿来解决这个问题。
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.
首先,无论啥时候链接到Rabbit我们都需要一个全新的、空的队列。要这么做,我们可以创建随机名字的队列,或者,
更棒的是??让rabbitmq服务来为我们选择一个随机队列。
Secondly, once we disconnect the consumer the queue should be automatically deleted.
期次,一旦断开消费者的连接,队列应该被自动删除。
In the php-amqplib client, when we supply queue name as an empty string, we create a non-durable queue with a generated name:
在php-amqplib客户端中,每当创建一个名字为空字符的队列,就相当创建一个非持久的队列,随之系统分配一个名字。
list($queue_name, ,) = $channel->queue_declare("");
When the method returns, the $queue_name variable contains a random queue name generated by RabbitMQ. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
$queue_name接收一个由RabbitMQ随机生成的队列名。例如:可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg这个样子。
When the connection that declared it closes, the queue will be deleted because it is declared as exclusive.
当连接关闭时,由于队列声明为独有的,所以会被删除掉。
We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
我们已经创建了一个fanout类型的交换器和一个队列。现在我们得告诉交换器向这个队列发送消息。这种交换器和队列之间的
关系称之为“捆绑”。
$channel->queue_bind($queue_name, 'logs');
From now on the logs exchange will append messages to our queue.
从现在起logs交换器就会把信息追加到我们的队列中了。
You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.
已经猜到了吧,用rabbitmqctl list_bindings来列出现存的捆绑使用。
The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.php script:
发送日志消息的生产者程序看起来和之前的没太大不同。最大的不同是现在我们要发送消息到logs交换器,而不是无名氏那个。
当发送时,我们需要提供一个routing_key,但对于fanout交换器来讲,它的值是忽略的。来看看emit_log.php脚本:
<?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->exchange_declare('logs', 'fanout', false, false, false);$data = implode(' ', array_slice($argv, 1));if(empty($data)) $data = "info: Hello World!";$msg = new AMQPMessage($data);$channel->basic_publish($msg, 'logs');echo " [x] Sent ", $data, "\n";$channel->close();$connection->close();?>
(emit_log.php source)
As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.
如亲所见,建立连接后我们声明了一个交换器。因为发布消息到不存在的交换器是禁止的,所以这步是必须的。
The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.
然而如果没有队列捆绑到交换器,消息将会丢失,但对我们来讲没什么关系。如果还没有消费者收听,我们可以安全地丢弃这条消息。
The code for receive_logs.php:(receive_logs.php代码)
channel();$channel->exchange_declare('logs', 'fanout', false, false, false);list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$channel->queue_bind($queue_name, 'logs');echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";$callback = function($msg){ echo ' [x] ', $msg->body, "\n";};$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while(count($channel->callbacks)) { $channel->wait();}$channel->close();$connection->close();?>
(receive_logs.php source)
If you want to save logs to a file, just open a console and type:
如果你想把日志保存到文件,就打开控制台然后输入:
$ php receive_logs.php > logs_from_rabbit.log
If you wish to see the logs on your screen, spawn a new terminal and run:
如果你想在屏幕上查看日志,打开一个新的终端然后运行:
$ php receive_logs.php
And of course, to emit logs type:
当然,发布消息请输入:
$ php emit_log.php
Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.rb programs running you should see something like:
使用rabbitmqctl list_bindings你可以验证代码有木有真的按照我们的期许创建捆绑和队列。运行俩receive_logs.php(这里官方文档copy有误,.rb的ruby文件扩展名还没改回来),有应该会看到像这样:
$ sudo rabbitmqctl list_bindingsListing bindings ...logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []...done.
The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.
结果展示简单明了:数据从logs交换器进入到两个由rabbitmq服务命名的队列中。这个和我们打算的一模一样。
To find out how to listen for a subset of messages, let's move on to tutorial 4
要想知道怎么收听部分消息嘛,请听下回分解~~~~~~~~~~~ :)