ホームページ >バックエンド開発 >PHPチュートリアル >メッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明
この記事では、PHP に関する関連知識を提供します。主に、メッセージ キュー RabbitMQ の概要といくつかの実践的な詳細を紹介します。メッセージ キューは、アプリケーション間の通信方法です。一緒に説明しましょう。見てください、それが皆さんのお役に立てば幸いです。
推奨学習: 「PHP ビデオ チュートリアル 」
説明
MQ (Message Queue) はアプリケーション間の通信方法であるメッセージキューです。送信後すぐに返送されるため、メッセージ システムはメッセージの信頼性の高い配信を保証します。 「メッセージキュー」は、メッセージを送信中に保存するコンテナです。典型的なのは、プロデューサー、コンシューマー モデルです。プロデューサはメッセージ キューにメッセージを生成し続け、コンシューマはキューからメッセージを取得し続けます。メッセージの生成と消費は非同期であり、メッセージの送信と受信のみを考慮するため、ビジネス ロジックの侵入はなく、プロデューサーとコンシューマーの分離が実現します。
メッセージ ミドルウェアを使用する理由
メッセージ キューは分散システムの重要なコンポーネントです。アプリケーションの分離、非同期メッセージング、トラフィックのピークカットなどの問題を解決し、高い同時実行性、高可用性、スケーラビリティ、結果整合性アーキテクチャを実現します
非同期処理
ユーザーは情報登録後にメールや登録用テキストメッセージを送信する必要があります
1. ユーザー登録情報がデータベースに書き込まれた後は、登録が成功した場合でも、成功した登録に関する情報が返されます
2. 電子メールと登録テキスト メッセージの送信メッセージ キューを通じて非同期に実行されるため、ユーザーはこれら 2 つの操作を待つ必要はありません
#アプリケーションの分離
ユーザーが注文した後、注文システムは在庫システムに通知する必要があります。従来のアプローチでは、注文システムが在庫システムのインターフェイスを呼び出して在庫を増減させます。
1. ユーザーが生産の注文を出し、成功プロンプトを返します。
2. キュー消費在庫システムが増減します。在庫
トラフィック ピーク シェービング
トラフィック ピーク シェービングはメッセージ キューの一般的なシナリオでもあり、フラッシュ セールやグループ グラブ アクティビティで広く使用されます
1. ユーザーのグループリクエストがキューに入り、キューの数が制御され、その数が一定数を超えるとフラッシュセールが終了します。
2. その後、キューは 1 つ消費されます。先入れ先出し法に従って 1 つずつ
#Rabbitmq の機能
信頼性 RabbitMQ はいくつかのメカニズムを使用します永続化、送信確認、リリース確認などの信頼性を確保します。
柔軟なルーティング メッセージは、キューに入る前に Exchange 経由でルーティングされます。一般的なルーティング機能については、RabbitMQ はすでにいくつかの組み込み Exchange 実装を提供しています。より複雑なルーティング機能の場合、複数の Exchange を結合することができ、プラグイン メカニズムを通じて独自の Exchange を実装することもできます。
メッセージ クラスタリング 複数の RabbitMQ サーバーがクラスターを形成して論理ブローカーを形成できます。
高可用性キュー キューをクラスター内のマシンにミラーリングできるため、一部のノードに問題が発生した場合でもキューを使用できます。
マルチプロトコル RabbitMQ は、STOMP、MQTT などの複数のメッセージ キュー プロトコルをサポートします。
多言語クライアント (多くのクライアント) RabbitMQ は、PHP Java、.NET、Ruby など、一般的に使用されるほぼすべての言語をサポートします。
管理 UI (管理 UI) RabbitMQ は、ユーザーがメッセージ ブローカーのさまざまな側面を監視および管理できるようにする使いやすいユーザー インターフェイスを提供します。
トレースメカニズム (トレース) メッセージが異常な場合、RabbitMQ はユーザーが何が起こったのかを知ることができるメッセージ追跡メカニズムを提供します。
プラグイン システム RabbitMQ は、さまざまな面で拡張するための多くのプラグインを提供します。また、独自のプラグインを作成することもできます。
RabbitMQ の仕組み
ブローカー: メッセージを受信および配布するアプリケーション、RabbitMQ サーバーはメッセージ ブローカーです。
仮想ホスト: mysql データベースと同様に、複数の異なるユーザーが同じ RabbitMQ サーバーが提供するサービスを使用する場合、複数の仮想ホストを分割し、各ユーザーがそのサーバーが提供するサービスを使用できます。同じ RabbitMQ サーバー。vhost は交換/キューなどを作成します。
接続: パブリッシャー/コンシューマーとブローカーの間の TCP 接続。Channel: RabbitMQ にアクセスするたびに Connection を確立すると、TCP Connection 確立のオーバーヘッドが大きくなり、メッセージ量が多い場合には効率が悪くなります。チャネルは、接続内で確立される論理接続であり、軽量の接続として、オペレーティング システムによる TCP 接続の確立コストを大幅に削減します。
Exchange: メッセージはブローカーの最初のストップに到達し、分散ルールに従って、クエリ テーブルのルーティング キーと一致し、メッセージをキューに分散します。一般的に使用されるタイプは、ダイレクト (ポイントツーポイント)、トピック (パブリッシュ/サブスクライブ)、およびファンアウト (マルチキャスト) です。
Queue: メッセージは最終的にここに送信され、コンシューマが取得します。メッセージは複数のキューに同時にコピーできます。
RabbitMQ 公式アドレス: http://www.rabbitmq.com
Rabbitmq をインストールするには、まず erlang をインストールする必要があります
最初のステップ: Erlang のインストール
Rabbitmq をインストールするには、まず erlang をインストールする必要があります。Centos7 は erlang 24 バージョンのインストールをサポートしていません
ダウンロード:
##
# 系统 centos 7# 下载erlang包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 验证安装是否成功 erl
第 2 ステップ: Rabbitmq をインストールする
#
# 系统 centos 7# 下载rabbitmq包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 启动 systemctl start rabbitmq-server # 关闭 systemctl stop rabbitmq-server # 查看默认端口服务是否启动 netstat -tunlp
php メッセージ キュー Rabbitmq のさまざまなモードの使用法
5672: クライアント通信ポート
#15672: HTTP API クライアント、管理 UI (管理プラグインが有効な場合のみ) は必ずしも起動しません 25672: ノード間通信に使用されます (Erlang ディストリビューション サーバー ポート)rabbitmq 管理コマンド
開始 15672: HTTP API クライアント、管理 UI (管理プラグインが有効な場合のみ) # 启动rabbitmq_management插件
rabbitmq-plugins enable rabbitmq_management
# 查看所有插件
rabbitmq-plugins list
UI インターフェイスへのアクセスをテストします。 (現時点ではローカルホストアドレスにログインできません)
http://192.168.10.105:15672/
rabbitmq 構成管理インターフェイス
# 新增一个用户 rabbitmqctl add_user 【用户名Username】 【密码Password】 rabbitmqctl add_user root root # 删除一个用户 rabbitmqctl delete_user Username # 修改用户的密码 rabbitmqctl change_password Username Newpassword # 查看当前用户列表 rabbitmqctl list_users # 设置用户角色的命令为: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。vhost と PHP 拡張機能のインストールを作成するコマンドライン
mysql データベースと同様に、複数の異なるユーザーが同じ RabbitMQ サーバーが提供するサービスを使用する場合、複数の vhost を分割し、各ユーザーが Exchange/Queue を作成することができます。独自の仮想ホストなど。 1) さまざまなユーザーの仮想ホストを表示します
仮想ホストを作成して権限を割り当てます
# 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #为vhost添加用户 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后边三个.*分别代表:配置权限、写权限、读权限
2) php 用の Rabbitmq 拡張機能のインストール https://github.com/php-amqplib/php-amqplib 拡張機能のインストール
Alibaba Cloud イメージの変更
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/ダウンロードの開始 –場合によっては、2.8 の下位バージョンがここでダウンロードされます。バージョン
を指定する必要があります。ダウンロードが失敗した場合は、コンポーザー、php.ini をアップグレードし、ソケットを開き、国内ミラーを展開して切り替えます
#
# 升级composer composer self-update #php.ini 打开 sockets 扩展 #下载指定版本 composer require php-amqplib/php-amqplib=^3.0
シンプル モードのプロデューサー メッセージはメッセージ キューにプッシュされます
ドキュメント: https://www.rabbitmq.com/tutorials/tutorial-one-php。 html 単純なプロデューサーとメッセージャー
http://localhost/rabbitmq/simple/pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); //生产数据 $data = 'this is messge'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); //关闭连接 $channel->close(); $connection->close();
运行生产者脚本:
http://localhost/rabbitmq/simple/pro.php
点击goods队列可以进入到消息详情
http://localhost/rabbitmq/simple/con.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
rabbitmq Work Queues
一个生产者对应多个消费者,消费特别慢时增加几个消费分发
生产者,和上文生产者不变
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); for ($i = 0; $i AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); } //关闭连接 $channel->close(); $connection->close();
消费者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
消费者worker2,代码和worker1一样,同时运行开启后会一起消费
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php
消费者消费消息ack确认
用以确认不会丢失消息
消费消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
no_ack=false,设置为手动应答
开启后需要进行消息的消费确认后才会进行移除,否者该消息会一直存在消息队列中
消费端代码
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
发布/订阅模式
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
文档:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
rabbitmq Exchange类型
交换器、路由键、绑定 Exchange:交换器。发送消息的AMQP实体。交换器拿到一个消息之后将它路由给一个或几个队列。它使用哪种路由算法是由交换机类型和被称作绑定(Binding)的规则所决定的。RabbitMQ有四种类型。 RoutingKey:路由键。生产者将消息发送给交换器。一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终失效。 Binding:绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。 # 四种模式 Direct 定向 消息与一个特定的路由键完全匹配 Topic 通配符 路由键和某模式进行匹配 Fanout 广播 发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列 Headers 不处理路由键,而是根据发送的消息内容中的headers属性进行匹配
exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。
生产者代码
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //声明数据 $data = 'this is fanout message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exc_name); //关闭连接 $channel->close(); $connection->close();
fanout模式消费者消费消息
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
当消费端运行时才会显示该队列
消费端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定 $channel->queue_bind($queue_name,$exc_name); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
文档:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html
用来指定不同的交换机和指定routing_key,在消费端进行消费
生产者代码:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'direct', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者代码
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; $channel->exchange_declare($exc_name, 'direct', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
通配符的匹配模式
如消费端中routing_key = ‘user.*’;
生产者:
指定routing_key= ‘user.top’
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'topic_log'; //指定routing_key $routing_key = 'user.top'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'topic', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者
消费端中routing_key = ‘user.*’;
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'user.*'; $channel->exchange_declare($exc_name, 'topic', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
推荐学习:《PHP视频教程》
以上がメッセージキュー RabbitMQ の概要と PHP サンプルの詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。