ホームページ  >  記事  >  バックエンド開発  >  メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

WBOY
WBOY転載
2022-05-12 18:33:527658ブラウズ

この記事では、PHP に関する関連知識を提供します。主に、メッセージ キュー RabbitMQ の概要といくつかの実践的な詳細を紹介します。メッセージ キューは、アプリケーション間の通信方法です。一緒に説明しましょう。見てください、それが皆さんのお役に立てば幸いです。

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

推奨学習: 「PHP ビデオ チュートリアル

メッセージ キューとメッセージ キュー アプリケーション シナリオの概要

RabbitMQ

説明
MQ (Message Queue) はアプリケーション間の通信方法であるメッセージキューです。送信後すぐに返送されるため、メッセージ システムはメッセージの信頼性の高い配信を保証します。 「メッセージキュー」は、メッセージを送信中に保存するコンテナです。典型的なのは、プロデューサー、コンシューマー モデルです。プロデューサはメッセージ キューにメッセージを生成し続け、コンシューマはキューからメッセージを取得し続けます。メッセージの生成と消費は非同期であり、メッセージの送信と受信のみを考慮するため、ビジネス ロジックの侵入はなく、プロデューサーとコンシューマーの分離が実現します。

メッセージ ミドルウェアを使用する理由
メッセージ キューは分散システムの重要なコンポーネントです。アプリケーションの分離、非同期メッセージング、トラフィックのピークカットなどの問題を解決し、高い同時実行性、高可用性、スケーラビリティ、結果整合性アーキテクチャを実現します

非同期処理
ユーザーは情報登録後にメールや登録用テキストメッセージを送信する必要があります
1. ユーザー登録情報がデータベースに書き込まれた後は、登録が成功した場合でも、成功した登録に関する情報が返されます
2. 電子メールと登録テキスト メッセージの送信メッセージ キューを通じて非同期に実行されるため、ユーザーはこれら 2 つの操作を待つ必要はありません
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

#アプリケーションの分離
ユーザーが注文した後、注文システムは在庫システムに通知する必要があります。従来のアプローチでは、注文システムが在庫システムのインターフェイスを呼び出して在庫を増減させます。
1. ユーザーが生産の注文を出し、成功プロンプトを返します。
2. キュー消費在庫システムが増減します。在庫
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

トラフィック ピーク シェービング
トラフィック ピーク シェービングはメッセージ キューの一般的なシナリオでもあり、フラッシュ セールやグループ グラブ アクティビティで広く使用されます
1. ユーザーのグループリクエストがキューに入り、キューの数が制御され、その数が一定数を超えるとフラッシュセールが終了します。
2. その後、キューは 1 つ消費されます。先入れ先出し法に従って 1 つずつ
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

#Rabbitmq の機能

信頼性 RabbitMQ はいくつかのメカニズムを使用します永続化、送信確認、リリース確認などの信頼性を確保します。
柔軟なルーティング メッセージは、キューに入る前に Exchange 経由でルーティングされます。一般的なルーティング機能については、RabbitMQ はすでにいくつかの組み込み Exchange 実装を提供しています。より複雑なルーティング機能の場合、複数の Exchange を結合することができ、プラグイン メカニズムを通じて独自の Exchange を実装することもできます。
メッセージ クラスタリング 複数の RabbitMQ サーバーがクラスターを形成して論理ブローカーを形成できます。
高可用性キュー キューをクラスター内のマシンにミラーリングできるため、一部のノードに問題が発生した場合でもキューを使用できます。
マルチプロトコル RabbitMQ は、STOMP、MQTT などの複数のメッセージ キュー プロトコルをサポートします。
多言語クライアント (多くのクライアント) RabbitMQ は、PHP Java、.NET、Ruby など、一般的に使用されるほぼすべての言語をサポートします。
管理 UI (管理 UI) RabbitMQ は、ユーザーがメッセージ ブローカーのさまざまな側面を監視および管理できるようにする使いやすいユーザー インターフェイスを提供します。
トレースメカニズム (トレース) メッセージが異常な場合、RabbitMQ はユーザーが何が起こったのかを知ることができるメッセージ追跡メカニズムを提供します。
プラグイン システム RabbitMQ は、さまざまな面で拡張するための多くのプラグインを提供します。また、独自のプラグインを作成することもできます。

RabbitMQ の仕組み
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

ブローカー: メッセージを受信および配布するアプリケーション、RabbitMQ サーバーはメッセージ ブローカーです。

仮想ホスト: mysql データベースと同様に、複数の異なるユーザーが同じ RabbitMQ サーバーが提供するサービスを使用する場合、複数の仮想ホストを分割し、各ユーザーがそのサーバーが提供するサービスを使用できます。同じ RabbitMQ サーバー。vhost は交換/キューなどを作成します。

接続: パブリッシャー/コンシューマーとブローカーの間の TCP 接続。

Channel: RabbitMQ にアクセスするたびに Connection を確立すると、TCP Connection 確立のオーバーヘッドが大きくなり、メッセージ量が多い場合には効率が悪くなります。チャネルは、接続内で確立される論理接続であり、軽量の接続として、オペレーティング システムによる TCP 接続の確立コストを大幅に削減します。

Exchange: メッセージはブローカーの最初のストップに到達し、分散ルールに従って、クエリ テーブルのルーティング キーと一致し、メッセージをキューに分散します。一般的に使用されるタイプは、ダイレクト (ポイントツーポイント)、トピック (パブリッシュ/サブスクライブ)、およびファンアウト (マルチキャスト) です。

Queue: メッセージは最終的にここに送信され、コンシューマが取得します。メッセージは複数のキューに同時にコピーできます。

rabbitmq インストール起動

RabbitMQ 公式アドレス: http://www.rabbitmq.com
Rabbitmq をインストールするには、まず erlang をインストールする必要があります

最初のステップ: Erlang のインストール
Rabbitmq をインストールするには、まず erlang をインストールする必要があります。Centos7 は erlang 24 バージョンのインストールをサポートしていません
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
ダウンロード:
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明##

# 系统  centos 7# 下载erlang包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白


# 安装
yum install erlang-23.3.4.4-1.el7.x86_64.rpm

# 验证安装是否成功
erl

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

第 2 ステップ: Rabbitmq をインストールする
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
#

# 系统  centos 7# 下载rabbitmq包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白


# 安装
yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm 

# 启动
systemctl start rabbitmq-server

# 关闭
systemctl stop rabbitmq-server

# 查看默认端口服务是否启动
netstat -tunlp
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明php メッセージ キュー Rabbitmq のさまざまなモードの使用法

rabbitmq 管理インターフェイスとコマンド ラインの使用法

4369: epmd (Erlang Port Mapper Daemon)、erlang サービス ポート

5672: クライアント通信ポート

#15672: HTTP API クライアント、管理 UI (管理プラグインが有効な場合のみ) は必ずしも起動しません

25672: ノード間通信に使用されます (Erlang ディストリビューション サーバー ポート)

rabbitmq 管理コマンド

開始 15672: HTTP API クライアント、管理 UI (管理プラグインが有効な場合のみ)

# 启动rabbitmq_management插件
rabbitmq-plugins  enable  rabbitmq_management

# 查看所有插件
rabbitmq-plugins  list
UI インターフェイスへのアクセスをテストします。 (現時点ではローカルホストアドレスにログインできません)
http://192.168.10.105:15672/


rabbitmq 構成管理インターフェイス

# 新增一个用户  
rabbitmqctl add_user 【用户名Username】 【密码Password】
rabbitmqctl add_user root root
# 删除一个用户  
rabbitmqctl delete_user Username

# 修改用户的密码 
rabbitmqctl change_password Username Newpassword 

# 查看当前用户列表    
rabbitmqctl list_users

# 设置用户角色的命令为: 
rabbitmqctl set_user_tags User Tag  
rabbitmqctl set_user_tags root administrator
# User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
vhost と PHP 拡張機能のインストールを作成するコマンドライン

mysql データベースと同様に、複数の異なるユーザーが同じ RabbitMQ サーバーが提供するサービスを使用する場合、複数の vhost を分割し、各ユーザーが Exchange/Queue を作成することができます。独自の仮想ホストなど。 1) さまざまなユーザーの仮想ホストを表示します


メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
仮想ホストを作成して権限を割り当てますメッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

# 新增vhost
rabbitmqctl add_vhost   vhostname
rabbitmqctl add_vhost order

# 查看vhost列表
rabbitmqctl  list_vhosts

#为vhost添加用户
rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*"
 ".*" ".*" ".*"后边三个.*分别代表:配置权限、写权限、读权限

2) php 用の Rabbitmq 拡張機能のインストールメッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明 https://github.com/php-amqplib/php-amqplib 拡張機能のインストール

Alibaba Cloud イメージの変更

composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
ダウンロードの開始 –場合によっては、2.8 の下位バージョンがここでダウンロードされます。バージョン

を指定する必要があります。ダウンロードが失敗した場合は、コンポーザー、php.ini をアップグレードし、ソケットを開き、国内ミラーを展開して切り替えます


#

# 升级composer
composer self-update

#php.ini 打开 sockets 扩展

#下载指定版本
composer require php-amqplib/php-amqplib=^3.0

シンプル モードのプロデューサー メッセージはメッセージ キューにプッシュされますメッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

ドキュメント:

https://www.rabbitmq.com/tutorials/tutorial-one-php。 html 単純なプロデューサーとメッセージャー

プロデューサー コード

http://localhost/rabbitmq/simple/pro.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//生产者
//Connection: publisher/consumer和broker之间的TCP连接
//Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();
//声明队列名为:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);

//生产数据
$data = 'this is messge';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
$channel->basic_publish($msg, $exchange = '', $queue_name);
//关闭连接
$channel->close();
$connection->close();

运行生产者脚本:
http://localhost/rabbitmq/simple/pro.php
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
点击goods队列可以进入到消息详情
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

simple模式消费者接受消息

http://localhost/rabbitmq/simple/con.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明队列名为:goods
$queue_name = 'goods';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
};
//开启消费
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

worker模式生产消费消息

rabbitmq Work Queues
一个生产者对应多个消费者,消费特别慢时增加几个消费分发
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
生产者,和上文生产者不变

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//生产者
//Connection: publisher/consumer和broker之间的TCP连接
//Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();
//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

for ($i = 0; $i  AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
//发布消息
    $channel->basic_publish($msg, $exchange = '', $queue_name);
}

//关闭连接
$channel->close();
$connection->close();

消费者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
};
//开启消费
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

消费者worker2,代码和worker1一样,同时运行开启后会一起消费
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php

消费者消费消息ack确认

用以确认不会丢失消息

消费消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
no_ack=false,设置为手动应答
开启后需要进行消息的消费确认后才会进行移除,否者该消息会一直存在消息队列中
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

消费端代码
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;


//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明队列名为:task_queue
$queue_name = 'task_queue';
$channel->queue_declare($queue_name, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);


//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

fanout模式生产者推送到交换器

发布/订阅模式
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能

文档:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

rabbitmq Exchange类型

交换器、路由键、绑定

    Exchange:交换器。发送消息的AMQP实体。交换器拿到一个消息之后将它路由给一个或几个队列。它使用哪种路由算法是由交换机类型和被称作绑定(Binding)的规则所决定的。RabbitMQ有四种类型。

    RoutingKey:路由键。生产者将消息发送给交换器。一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终失效。

    Binding:绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。

# 四种模式
Direct  定向 消息与一个特定的路由键完全匹配

Topic  通配符 路由键和某模式进行匹配

Fanout  广播 发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列
Headers 不处理路由键,而是根据发送的消息内容中的headers属性进行匹配

exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
生产者代码
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);

//声明数据
$data = 'this is fanout message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//发布消息
$channel->basic_publish($msg, $exc_name);
//关闭连接
$channel->close();
$connection->close();

メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

fanout模式消费者消费消息
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能

当消费端运行时才会显示该队列
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
消费端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明交换器
$exc_name = 'exch';
$channel->exchange_declare($exc_name, 'fanout', false, false, false);

//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//将队列名与交换器名进行绑定
$channel->queue_bind($queue_name,$exc_name);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

direct模式消息队列使用

文档:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html

用来指定不同的交换机和指定routing_key,在消费端进行消费
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
生产者代码:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';

//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'direct', false, false, false);

//声明数据
$data = 'this is ' . $routing_key . ' message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();

消费者代码
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'info';

$channel->exchange_declare($exc_name, 'direct', false, false, false);

//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

topic模式消息队列使用

通配符的匹配模式

如消费端中routing_key = ‘user.*’;
メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明

生产者:
指定routing_key= ‘user.top’

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();
//声明交换器
$exc_name = 'topic_log';
//指定routing_key
$routing_key = 'user.top';

//指定交换机类型为direct
$channel->exchange_declare($exc_name, 'topic', false, false, false);

//声明数据
$data = 'this is ' . $routing_key . ' message';
//创建消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

//发布消息
//指定使用的routing_key
$channel->basic_publish($msg, $exc_name, $routing_key);
//关闭连接
$channel->close();
$connection->close();

消费者
消费端中routing_key = ‘user.*’;

<?php require_once "../vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;

//建立connction
$connection = new AMQPStreamConnection(&#39;192.168.10.105&#39;, 5672, &#39;root&#39;, &#39;root&#39;, &#39;order&#39;);
//Channel
$channel = $connection->channel();

//声明交换器
$exc_name = 'direct_log';
//指定routing_key
$routing_key = 'user.*';

$channel->exchange_declare($exc_name, 'topic', false, false, false);

//获取系统生成的消息队列名称
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

//将队列名与交换器名进行绑定,并指定routing_key
$channel->queue_bind($queue_name,$exc_name,$routing_key);

$callback = function ($msg) {
    echo 'received = ', $msg->body . "\n";
    //确认消息已被消费,从生产队列中移除
    $msg->ack();
};

//设置消费成功后才能继续进行下一个消费
$channel->basic_qos(null, 1, null);

//开启消费no_ack=false,设置为手动应答
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//不断的循环进行消费
while ($channel->is_open()) {
    $channel->wait();
}

//关闭连接
$channel->close();
$connection->close();

推荐学习:《PHP视频教程

以上がメッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。