Maison  >  Article  >  développement back-end  >  Mécanisme de confirmation des messages de file d'attente et méthode de traitement des nouvelles tentatives de message en PHP et MySQL

Mécanisme de confirmation des messages de file d'attente et méthode de traitement des nouvelles tentatives de message en PHP et MySQL

WBOY
WBOYoriginal
2023-10-15 12:31:431162parcourir

Mécanisme de confirmation des messages de file dattente et méthode de traitement des nouvelles tentatives de message en PHP et MySQL

Mécanisme de confirmation des messages de la file d'attente et méthode de traitement des nouvelles tentatives de message en PHP et MySQL

Introduction :
Avec le développement d'applications Internet, de nombreux services en ligne doivent gérer un grand nombre de requêtes, et ces requêtes nécessitent souvent une méthode de traitement asynchrone . Les files d'attente constituent une solution courante qui permet de dissocier efficacement les demandes du traitement, améliorant ainsi les performances et la fiabilité du système. Cet article présentera le mécanisme de confirmation des messages et la méthode de traitement des nouvelles tentatives de messages des files d'attente dans PHP et MySQL, et donnera des exemples de code spécifiques.

1. Le concept et la fonction de la file d'attente des messages
La file d'attente des messages est un mode d'application courant, qui stocke les messages dans une file d'attente puis les traite de manière asynchrone. Les avantages des files d'attente de messages se reflètent principalement dans les aspects suivants :

  1. Découplage : découplage des requêtes et du traitement pour améliorer l'évolutivité et la maintenabilité du système.
  2. Traitement asynchrone : mettez les opérations fastidieuses en file d'attente pour le traitement asynchrone afin d'améliorer la vitesse de réponse du système.
  3. Mécanisme de tolérance aux pannes en cas d'échec du traitement : la fiabilité du traitement des messages est assurée grâce à des mécanismes de confirmation des messages et de nouvelle tentative de message.

2. Mécanisme de confirmation des messages
Dans le système de file d'attente, la confirmation des messages est un mécanisme permettant de garantir l'achèvement du traitement des messages. Le mécanisme d'accusé de réception des messages permet d'éviter le problème de perte de message ou de traitement en double.

L'accusé de réception des messages en PHP peut être obtenu en utilisant le mécanisme ACK. Les étapes spécifiques de mise en œuvre sont les suivantes :

  1. Le producteur envoie des messages à la file d'attente.
  2. Le consommateur retire le message de la file d'attente pour traitement.
  3. Si le traitement du message réussit, le consommateur envoie un ACK pour confirmer que le traitement du message est terminé, sinon, le consommateur envoie NACK pour rejeter le traitement du message ;
  4. La file d'attente accuse réception d'un ACK ou NACK pour le message et supprime le message ACK de la file d'attente.

Ce qui suit est un exemple de code utilisant RabbitMQ comme file d'attente de messages :
Producteur :

#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('queue_name', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'queue_name');

$channel->close();
$connection->close();

Consommateur :

#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('queue_name', false, false, false, false);

$callback = function (AMQPMessage $msg) {
    echo 'Received message: ' . $msg->body . PHP_EOL;
    if (processMessage($msg)) {
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // 消息处理成功,发送ACK确认
    } else {
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); // 消息处理失败,发送NACK拒绝
    }
};

$channel->basic_consume('queue_name', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}

function processMessage(AMQPMessage $msg) {
    // 消息处理逻辑
    if ($msg->body == 'Hello World!') {
        return true;
    } else {
        return false;
    }
}

$channel->close();
$connection->close();

3. Méthode de traitement des nouvelles tentatives de message
Dans les applications réelles, le traitement des messages peut échouer, par exemple une panne de réseau ou une erreur de serveur. , etc. Afin de garantir la fiabilité des messages, les messages qui ne parviennent pas à être traités peuvent être réessayés.

MySQL fournit des mécanismes de transaction et de restauration, qui peuvent être appliqués dans le traitement des nouvelles tentatives de message. Les étapes spécifiques de mise en œuvre sont les suivantes :

  1. Le producteur envoie des messages à la table des messages de la base de données.
  2. Le consommateur retire le message de la table des messages de la base de données pour traitement.
  3. Si le message est traité avec succès, supprimez le message du tableau des messages ; sinon, ajoutez un au nombre de temps de traitement dans le tableau des messages et mettez à jour le temps de traitement.
  4. Configurez une tâche planifiée pour vérifier régulièrement les messages de la table des messages qui ont été traités moins ou également au nombre maximum de tentatives, et les renvoyer au consommateur.

Ce qui suit est un exemple de code utilisant MySQL comme stockage de messages :
Producteur :

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'INSERT INTO message_queue (message) VALUES (?)';
    $stmt = $db->prepare($sql);
    $message = 'Hello World!';
    $stmt->bindParam(1, $message);
    $stmt->execute();

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

Consommateur :

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'SELECT * FROM message_queue';
    $stmt = $db->prepare($sql);
    $stmt->execute();
    $messages = $stmt->fetchAll();

    foreach ($messages as $message) {
        if (processMessage($message)) {
            $deleteSql = 'DELETE FROM message_queue WHERE id = ?';
            $deleteStmt = $db->prepare($deleteSql);
            $deleteStmt->bindParam(1, $message['id']);
            $deleteStmt->execute();
        } else {
            $retrySql = 'UPDATE message_queue SET retries = retries + 1, last_retry_time = ? WHERE id = ?';
            $retryStmt = $db->prepare($retrySql);
            $now = date('Y-m-d H:i:s');
            $retryStmt->bindParam(1, $now);
            $retryStmt->bindParam(2, $message['id']);
            $retryStmt->execute();
        }
    }

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

function processMessage($message) {
    // 消息处理逻辑
    if ($message['message'] == 'Hello World!') {
        return true;
    } else {
        return false;
    }
}

Tâche planifiée :

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'SELECT * FROM message_queue WHERE retries <= ?';
    $stmt = $db->prepare($sql);
    $maxRetries = 3;
    $stmt->bindParam(1, $maxRetries);
    $stmt->execute();
    $messages = $stmt->fetchAll();

    foreach ($messages as $message) {
        // 重新投递消息给消费者
    }

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

Conclusion :
Grâce au mécanisme de confirmation des messages et à la méthode de traitement des nouvelles tentatives de message, nous pouvons améliorer la fiabilité et la stabilité du système. En tant qu'outil commun de découplage et de traitement asynchrone, les files d'attente peuvent implémenter efficacement la confirmation des messages et les nouvelles tentatives dans PHP et MySQL, offrant ainsi de meilleures performances et une meilleure expérience utilisateur pour nos applications.

Références :

  1. Documentation officielle PHP RabbitMQ : https://github.com/php-amqplib/php-amqplib
  2. Documentation officielle PHP MySQL : https://www.php.net/manual/en/book . pdo.php

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn