Maison  >  Article  >  développement back-end  >  Comment implémenter des files d'attente et des diffusions de messages distribués dans les microservices PHP

Comment implémenter des files d'attente et des diffusions de messages distribués dans les microservices PHP

王林
王林original
2023-09-25 18:05:021382parcourir

Comment implémenter des files dattente et des diffusions de messages distribués dans les microservices PHP

Comment implémenter des files d'attente et des diffusions de messages distribuées dans les microservices PHP

Avant-propos :
Dans le développement de systèmes distribués modernes, les files d'attente et les diffusions de messages sont des composants très courants utilisés pour implémenter la communication entre divers systèmes. Découplage et communication. Dans l'architecture de microservice PHP, afin d'implémenter des fonctions de traitement et de diffusion de messages distribués, nous pouvons utiliser des outils et frameworks open source matures pour simplifier le développement. Cet article explique comment utiliser RabbitMQ et Swoole pour implémenter des files d'attente et des diffusions de messages distribués.

1. Concepts de base et utilisation de RabbitMQ
RabbitMQ est un middleware de messagerie fiable, open source et multiplateforme. Il suit la norme AMQP (Advanced Message Queuing Protocol) et offre des capacités complètes de production et de consommation de messages. Voici quelques concepts de base de RabbitMQ :

  1. Producer : le programme qui envoie des messages.
  2. Queue : un conteneur qui enregistre les messages.
  3. Consumer : Un programme qui reçoit et traite les messages.
  4. Reconnaissances du consommateur : une fois que le consommateur a reçu le message, il envoie un message de confirmation à la file d'attente pour informer la file d'attente que le message a été traité.
  5. Exchange : Reçoit les messages envoyés par les producteurs et achemine les messages vers la file d'attente selon certaines règles.
  6. Binding : Liaison de la relation entre l'échangeur et la file d'attente.

Ce qui suit est un exemple de code PHP qui montre comment envoyer et recevoir des messages dans RabbitMQ :

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

// 发送消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo "Sent 'Hello World!'";

// 接收消息
$callback = function ($msg) {
  echo "Received: ", $msg->body, "
";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
  $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

2. Les concepts de base et l'utilisation de Swoole
Swoole est un framework de communication réseau hautes performances basé sur PHP qui fournit de puissants systèmes asynchrones. Capacités d’E/S et modèle de programmation basé sur les événements. Dans l'architecture de microservice PHP, nous pouvons utiliser Swoole pour implémenter des fonctions de diffusion de messages distribués.

Voici quelques concepts de base de Swoole :

  1. Serveur : un programme qui reçoit et traite les requêtes réseau.
  2. Client : Le programme qui envoie des requêtes réseau.
  3. Événement : L'interaction entre le serveur et le client.
  4. Asynchrone : Une méthode qui ne bloque pas l'exécution du processus principal.
  5. Synchronous : méthode permettant de bloquer l'exécution du processus principal jusqu'à ce que l'opération soit terminée.

Ce qui suit est un exemple de code PHP qui montre comment créer un serveur TCP et diffuser des messages dans Swoole :

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";

  // 广播消息给所有客户端
  $server->sendtoAll($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

3. Implémenter des files d'attente de messages distribuées dans les microservices PHP
Afin d'implémenter des messages distribués dans la file d'attente des microservices PHP, nous pouvons utilisez RabbitMQ et Swoole ensemble. Tout d’abord, nous devons démarrer un consommateur RabbitMQ et un serveur Swoole TCP.

Exemple de code consommateur RabbitMQ :

// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建通道
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, false, false, false);

// 设置每次只接收一条消息
$channel->basic_qos(null, 1, null);

// 定义消息处理的回调函数
$callback = function ($msg) {
  echo "Received: ", $msg->body, "
";
  // 模拟任务处理
  sleep(3);
  echo "Task finished.
";
  // 显示确认消息
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 监听队列,接收消息
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
  $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

Exemple de code serveur Swoole TCP :

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

$server->set([
  'worker_num' => 4,         // 设置工作进程数
  'task_worker_num' => 4,    // 设置任务进程数
]);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";
  
  // 将接收到的消息发送给任务进程处理
  $server->task($data);
});

$server->on('task', function ($server, $task_id, $from_id, $data) {
  // 模拟任务处理
  sleep(3);
  
  // 处理结果发送给请求进程
  $server->finish($data);
});

$server->on('finish', function ($server, $task_id, $data) {
  // 将处理结果发送给客户端
  $server->send($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

Lorsque le consommateur RabbitMQ reçoit le message, cela signifie qu'une tâche est créée et commence le traitement. Ensuite, le serveur Swoole TCP envoie le message reçu au processus de tâche pour traitement et envoie le résultat du traitement au client via la fonction de rappel.

4. Implémenter la diffusion de messages distribués dans les microservices PHP
Afin d'implémenter la diffusion de messages distribués dans les microservices PHP, nous pouvons combiner la fonction de diffusion de Swoole avec un cache distribué (comme Redis). Tout d'abord, nous devons créer un serveur Swoole TCP et un abonné Redis.

Exemple de code pour le serveur Swoole TCP :

// 创建服务器
$server = new swoole_server("127.0.0.1", 9501);

// 注册事件回调函数
$server->on('connect', function ($server, $fd) {
  echo "Client {$fd}: connect.
";
});

$server->on('receive', function ($server, $fd, $from_id, $data) {
  echo "Received: $data 
";

  // 将接收到的消息广播给所有客户端
  $server->sendtoAll($data);
});

$server->on('close', function ($server, $fd) {
  echo "Client {$fd}: close.
";
});

// 启动服务器
$server->start();

Exemple de code pour l'abonné Redis :

// 创建Redis连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 订阅消息
$redis->subscribe('channel', function ($redis, $channel, $message) {
  echo "Received from Redis: $message 
";
  
  // 发送消息给Swoole TCP服务器
  $client = new swoole_client(SWOOLE_SOCK_TCP);
  if (!$client->connect('127.0.0.1', 9501, -1)) {
    echo "Failed to connect to server.";
    exit;
  }
  $client->send($message);
  $client->close();
});

Lorsque Redis reçoit le message, il l'envoie au serveur Swoole TCP via la fonction de rappel, puis le serveur diffuse le message reçu à tous clients.

Résumé :
Grâce à l'exemple de code ci-dessus, nous pouvons apprendre à utiliser RabbitMQ et Swoole pour implémenter des fonctions de file d'attente de messages distribuées et de diffusion dans les microservices PHP. Ces technologies et outils peuvent nous aider à créer des systèmes distribués hautes performances et évolutifs et à améliorer le découplage et la fiabilité des systèmes.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn