ホームページ  >  記事  >  バックエンド開発  >  PHP マイクロサービスで分散メッセージ キューとブロードキャストを実装する方法

PHP マイクロサービスで分散メッセージ キューとブロードキャストを実装する方法

王林
王林オリジナル
2023-09-25 18:05:021336ブラウズ

PHP マイクロサービスで分散メッセージ キューとブロードキャストを実装する方法

PHP マイクロサービスで分散メッセージ キューとブロードキャストを実装する方法

前書き:
現代の分散システム開発では、メッセージ キューとブロードキャストは、次の目的で使用される非常に一般的なコンポーネントです。さまざまなシステム間のデカップリングと通信を実現します。 PHP マイクロサービス アーキテクチャでは、分散メッセージ処理とブロードキャスト機能を実装するために、いくつかの成熟したオープン ソース ツールとフレームワークを使用して開発を簡素化できます。この記事では、RabbitMQ と Swoole を使用して分散メッセージ キューとブロードキャストを実装する方法を紹介します。

1. RabbitMQ の基本概念と使用法
RabbitMQ は、信頼性の高いオープンソースのクロスプラットフォームのメッセージ ミドルウェアです。 AMQP (Advanced Message Queuing Protocol) 標準に従い、完全なメッセージの生成および消費機能を提供します。以下に、RabbitMQ の基本概念をいくつか示します。

  1. プロデューサー: メッセージを送信するプログラム。
  2. Queue (キュー): メッセージを保存するコンテナー。
  3. コンシューマ: メッセージを受信して​​処理するプログラム。
  4. コンシューマ確認応答: コンシューマはメッセージを受信した後、確認メッセージをキューに送信して、メッセージが処理されたことをキューに通知します。
  5. Exchange: プロデューサによって送信されたメッセージを受信し、特定のルールに従ってメッセージをキューにルーティングします。
  6. バインディング: エクスチェンジャーとキューの間の関係をバインドします。

次は、RabbitMQ でメッセージを送受信する方法を示すサンプル PHP コードです:

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

// 发送消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo "Sent 'Hello World!'";

// 接收消息
$callback = function ($msg) {
  echo "Received: ", $msg->body, "
";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
  $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

2. Swoole の基本概念と使用法
Swoole は、A PHP ベースの高性能ネットワーク通信フレームワークで、強力な非同期 IO 機能とイベント駆動型プログラミング モードを提供します。 PHP マイクロサービス アーキテクチャでは、Swoole を使用して分散メッセージ ブロードキャスト機能を実装できます。

以下は、Swoole の基本概念の一部です。

  1. サーバー: ネットワーク要求を受信して​​処理するプログラム。
  2. クライアント: ネットワーク要求を送信するプログラム。
  3. イベント: サーバーとクライアント間の対話。
  4. 非同期: メインプロセスの実行をブロックしないメソッド。
  5. 同期: 操作が完了するまでメインプロセスの実行をブロックする方法。

以下は、Swoole で TCP サーバーを作成し、メッセージをブロードキャストする方法を示すサンプル PHP コードです:

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";

  // 广播消息给所有客户端
  $server->sendtoAll($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

3. PHP マイクロサービスで分散メッセージ キューを実装します
In PHP マイクロサービスで分散メッセージ キューを実装するには、RabbitMQ と Swoole を一緒に使用できます。まず、RabbitMQ コンシューマーと Swoole TCP サーバーを起動する必要があります。

RabbitMQ コンシューマのコード例:

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, false, false, false);

// 设置每次只接收一条消息
$channel->basic_qos(null, 1, null);

// 定义消息处理的回调函数
$callback = function ($msg) {
  echo "Received: ", $msg->body, "
";
  // 模拟任务处理
  sleep(3);
  echo "Task finished.
";
  // 显示确认消息
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 监听队列,接收消息
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
  $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

Swoole TCP サーバーのコード例:

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

$server->set([
  'worker_num' => 4,         // 设置工作进程数
  'task_worker_num' => 4,    // 设置任务进程数
]);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";
  
  // 将接收到的消息发送给任务进程处理
  $server->task($data);
});

$server->on('task', function ($server, $task_id, $from_id, $data) {
  // 模拟任务处理
  sleep(3);
  
  // 处理结果发送给请求进程
  $server->finish($data);
});

$server->on('finish', function ($server, $task_id, $data) {
  // 将处理结果发送给客户端
  $server->send($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

RabbitMQ コンシューマがメッセージを受信すると、タスクが作成され、処理が開始されることを意味します。 。そして、Swoole TCPサーバーは受信したメッセージをタスクプロセスに送信して処理し、処理結果をコールバック関数を通じてクライアントに送信します。

4. PHP マイクロサービスで分散メッセージ ブロードキャストを実装する
PHP マイクロサービスで分散メッセージ ブロードキャストを実装するには、Swoole のブロードキャスト機能と分散キャッシュ (Redis など) を組み合わせることができます。まず、Swoole TCP サーバーと Redis サブスクライバーを作成する必要があります。

Swoole TCP サーバーのコード例:

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";

  // 将接收到的消息广播给所有客户端
  $server->sendtoAll($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

Redis サブスクライバーのコード例:

// 创建Redis连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 订阅消息
$redis->subscribe('channel', function ($redis, $channel, $message) {
  echo "Received from Redis: $message 
";
  
  // 发送消息给Swoole TCP服务器
  $client = new swoole_client(SWOOLE_SOCK_TCP);
  if (!$client->connect('127.0.0.1', 9501, -1)) {
    echo "Failed to connect to server.";
    exit;
  }
  $client->send($message);
  $client->close();
});

Redis がメッセージを受信すると、コールバック関数を通じてメッセージが Swoole TCP サーバーに送信されます。次に、サーバーは受信したメッセージをすべてのクライアントにブロードキャストします。

概要:
上記のサンプル コードを通じて、RabbitMQ と Swoole を使用して、PHP マイクロサービスに分散メッセージ キューとブロードキャスト関数を実装する方法を学習できます。これらのテクノロジーとツールは、高性能でスケーラブルな分散システムを構築し、システムの分離と信頼性を向上させるのに役立ちます。

以上がPHP マイクロサービスで分散メッセージ キューとブロードキャストを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。