Maison  >  Article  >  cadre php  >  Pratique d'intégration de Swoole et RabbitMQ : création d'un système de file d'attente de messages à haute disponibilité

Pratique d'intégration de Swoole et RabbitMQ : création d'un système de file d'attente de messages à haute disponibilité

WBOY
WBOYoriginal
2023-06-14 12:56:091317parcourir

Avec l'avènement de l'ère Internet, les systèmes de file d'attente de messages sont devenus de plus en plus importants. Il permet des opérations asynchrones entre différentes applications, réduit le couplage et améliore l'évolutivité, améliorant ainsi les performances et l'expérience utilisateur de l'ensemble du système. Dans le système de file d'attente de messages, RabbitMQ est un puissant logiciel de file d'attente de messages open source. Il prend en charge une variété de protocoles de messages et est largement utilisé dans les transactions financières, le commerce électronique, les jeux en ligne et d'autres domaines.

Dans les applications pratiques, il est souvent nécessaire d'intégrer RabbitMQ à d'autres systèmes. Cet article explique comment utiliser l'extension swoole pour implémenter un cluster RabbitMQ à haute disponibilité et fournit un exemple de code complet.

1. Intégration de RabbitMQ

  1. Introduction à RabbitMQ

RabbitMQ est un logiciel de file d'attente de messages open source et multiplateforme. Il suit entièrement le protocole AMQP (Advanced Message Queuing Protocol) et prend en charge plusieurs protocoles de messages. L'idée principale de RabbitMQ est de mettre les messages dans la file d'attente et de les retirer en cas de besoin, permettant ainsi un échange de données et une communication asynchrones efficaces.

  1. Intégration RabbitMQ

Afin d'intégrer RabbitMQ aux applications PHP, nous pouvons utiliser l'API fournie par la bibliothèque PHP AMQP. La bibliothèque prend en charge le principal protocole et extensions AMQP 0-9-1 de RabbitMQ, notamment les fonctions de publication, d'abonnement, de file d'attente, d'échange et d'autres. Voici un exemple de code simple :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

// 创建消息
$msg = new AMQPMessage('Hello World!');

// 发送消息
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'
";

// 关闭连接
$channel->close();
$connection->close();
?>

Cet exemple de code se connecte au serveur RabbitMQ local (« localhost »), déclare une file d'attente nommée « bonjour » et envoie des messages à cette file d'attente.

2. Intégration de Swoole

  1. Introduction à Swoole

Swoole est un framework de communication réseau asynchrone PHP haute performance qui implémente TCP, UDP, HTTP, WebSocket et d'autres protocoles de communication asynchrones basés sur EventLoop. Il se caractérise par une concurrence élevée, des performances élevées, une faible consommation et un développement facile. Il a été largement utilisé dans des scénarios tels que les services Web et les serveurs de jeux.

  1. Swoole intègre RabbitMQ

Les fonctionnalités asynchrones de Swoole sont très adaptées à la communication asynchrone RabbitMQ et peuvent obtenir un système de file d'attente de messages efficace, stable et à faible latence. Voici un exemple de code pour Swoole intégrant RabbitMQ :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C
";

// 接收消息
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "
";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done
";
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

// 监听消息
while (count($channel->callbacks)) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();
?>

Cet exemple de code se connecte au serveur RabbitMQ local (« localhost »), déclare une file d'attente persistante « task_queue » et commence à écouter les messages de la file d'attente. Lorsqu'un message arrive, Swoole appelle la fonction de rappel de manière asynchrone et peut envoyer une réponse après avoir traité la logique métier dans la fonction de rappel pour obtenir une communication asynchrone efficace et à faible latence.

3. Architecture haute disponibilité

Afin d'obtenir un système de file d'attente de messages haute disponibilité, nous devons intégrer plusieurs nœuds RabbitMQ dans un cluster pour améliorer l'évolutivité et la tolérance aux pannes du système.

Les configurations de cluster RabbitMQ couramment utilisées incluent le mode actif-veille et le mode miroir. En mode actif-veille, un nœud sert de nœud actif et les autres nœuds servent de nœuds de sauvegarde. Lorsque le nœud principal tombe en panne, le nœud de sauvegarde prend automatiquement ses responsabilités. En mode miroir, une file d'attente est répliquée sur le disque sur plusieurs nœuds et maintenue synchronisée. Chacun de ces nœuds peut gérer les messages envoyés par les producteurs et les demandes des consommateurs.

Compte tenu de la stabilité, de l'évolutivité, de la maintenabilité et d'autres facteurs, nous avons choisi le mode miroir comme architecture haute disponibilité. Voici un exemple de code pour ajouter une file d'attente miroir dans le fichier de configuration :

$channel->queue_declare('task_queue', false, true, false, false, false, array(
    'x-ha-policy' => array('S', 'all'),
    'x-dead-letter-exchange' => array('S', 'dead_exchange'),
));

Cet exemple de code crée une file d'attente persistante nommée "task_queue" et définit le paramètre "x-ha-policy" sur "all", indiquant que cette file d'attente Toutes les files d'attente miroir sont « hautement disponibles ». Dans le même temps, le paramètre 'x-dead-letter-exchange' est également défini sur 'dead_exchange', ce qui signifie que le message sera envoyé à ce commutateur après avoir été rejeté. Ce commutateur peut avoir une ou plusieurs files d'attente destinées à la reconsommation des messages ou aux statistiques.

4. Exemple de code complet

Ce qui suit est un exemple de code complet du système de file d'attente de messages, qui utilise le cadre de communication asynchrone Swoole pour intégrer le mode de file d'attente miroir RabbitMQ afin d'implémenter un système de file d'attente de messages à haute disponibilité. Vous pouvez modifier la configuration ou le code pour implémenter votre propre système de file d'attente de messages en fonction des besoins réels.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$exchangeName = 'test.exchange';
$queueName = 'test.queue';
$deadExchangeName = 'dead.exchange';

// 建立连接
$connection = new AMQPStreamConnection(
    'localhost', 5672, 'guest', 'guest', '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, true
);
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare($exchangeName, 'direct', false, true, false);

// 声明死信交换机
$channel->exchange_declare($deadExchangeName, 'fanout', false, true, false);

// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, array(
    'x-ha-policy' => array('S', 'all'),
    'x-dead-letter-exchange' => array('S', $deadExchangeName),
));

// 绑定队列到交换机中
$channel->queue_bind($queueName, $exchangeName);

echo " [*] Waiting for messages. To exit press CTRL+C
";

// 接收消息
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "
";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done
";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

// 监听消息
while (count($channel->callbacks)) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();
?>

Dans le code ci-dessus, la connexion à RabbitMQ est d'abord établie via la classe AMQPStreamConnection. Ensuite, créez un commutateur nommé « test.exchange », une file d'attente nommée « test.queue » et définissez « x-ha-policy » sur « all », indiquant que cette file d'attente est une file d'attente miroir et que tous les nœuds peuvent y accéder. Dans le même temps, « x-dead-letter-exchange » est également défini sur « dead.exchange », ce qui signifie que le message sera envoyé au commutateur « dead.exchange » après avoir été rejeté.

Enfin, dans la fonction de rappel, utilisez la méthode basic_ack() pour déterminer le succès de la consommation et libérer les ressources occupées par le message.

Ce qui précède est le contenu pertinent sur la pratique d'intégration de Swoole et RabbitMQ. En utilisant l'extension Swoole, nous pouvons facilement implémenter une communication asynchrone et intégrer plusieurs nœuds RabbitMQ dans un système de file d'attente de messages à haute disponibilité pour améliorer les performances et la stabilité du système.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn