Heim  >  Artikel  >  Backend-Entwicklung  >  Warteschlangennachrichtenbestätigungsmechanismus und Nachrichtenwiederholungsverarbeitungsmethode in PHP und MySQL

Warteschlangennachrichtenbestätigungsmechanismus und Nachrichtenwiederholungsverarbeitungsmethode in PHP und MySQL

WBOY
WBOYOriginal
2023-10-15 12:31:431162Durchsuche

Warteschlangennachrichtenbestätigungsmechanismus und Nachrichtenwiederholungsverarbeitungsmethode in PHP und MySQL

Queue-Nachrichtenbestätigungsmechanismus und Nachrichtenwiederholungsverarbeitungsmethode in PHP und MySQL

Einführung:
Mit der Entwicklung von Internetanwendungen müssen viele Online-Dienste eine große Anzahl von Anforderungen verarbeiten, und diese Anforderungen erfordern häufig eine asynchrone Verarbeitungsmethode . Warteschlangen sind eine gängige Lösung, die Anfragen effektiv von der Verarbeitung entkoppeln und so die Systemleistung und -zuverlässigkeit verbessern kann. In diesem Artikel werden der Nachrichtenbestätigungsmechanismus und die Nachrichtenwiederholungsverarbeitungsmethode von Warteschlangen in PHP und MySQL vorgestellt und spezifische Codebeispiele gegeben.

1. Das Konzept und die Funktion der Nachrichtenwarteschlange
Nachrichtenwarteschlange ist ein allgemeiner Anwendungsmodus, der Nachrichten in einer Warteschlange speichert und sie dann asynchron verarbeitet. Die Vorteile von Nachrichtenwarteschlangen spiegeln sich hauptsächlich in den folgenden Aspekten wider:

  1. Entkopplung: Entkopplung von Anforderungen und Verarbeitung, um die Skalierbarkeit und Wartbarkeit des Systems zu verbessern.
  2. Asynchrone Verarbeitung: Stellen Sie zeitaufwändige Vorgänge in die Warteschlange für die asynchrone Verarbeitung, um die Reaktionsgeschwindigkeit des Systems zu verbessern.
  3. Fehlertoleranter Mechanismus für fehlgeschlagene Verarbeitung: Die Zuverlässigkeit der Nachrichtenverarbeitung wird durch Mechanismen zur Nachrichtenbestätigung und Nachrichtenwiederholung sichergestellt.

2. Nachrichtenbestätigungsmechanismus
Im Warteschlangensystem ist die Nachrichtenbestätigung ein Mechanismus, um den Abschluss der Nachrichtenverarbeitung sicherzustellen. Der Nachrichtenbestätigungsmechanismus trägt dazu bei, das Problem des Nachrichtenverlusts oder der doppelten Verarbeitung zu vermeiden.

Die Nachrichtenbestätigung in PHP kann mithilfe des ACK-Mechanismus erreicht werden. Die spezifischen Implementierungsschritte lauten wie folgt:

  1. Der Produzent sendet Nachrichten an die Warteschlange.
  2. Der Verbraucher nimmt die Nachricht zur Verarbeitung aus der Warteschlange.
  3. Wenn die Nachrichtenverarbeitung erfolgreich ist, sendet der Verbraucher ACK, um zu bestätigen, dass die Nachrichtenverarbeitung abgeschlossen ist. Andernfalls sendet der Verbraucher NACK, um die Nachrichtenverarbeitung abzulehnen.
  4. Die Warteschlange bestätigt den Empfang einer ACK oder NACK für die Nachricht und entfernt die ACK-Nachricht aus der Warteschlange.

Das Folgende ist ein Beispielcode, der RabbitMQ als Nachrichtenwarteschlange verwendet:
Produzent:

#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('queue_name', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'queue_name');

$channel->close();
$connection->close();

Verbraucher:

#!/usr/bin/env php
<?php
require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('queue_name', false, false, false, false);

$callback = function (AMQPMessage $msg) {
    echo 'Received message: ' . $msg->body . PHP_EOL;
    if (processMessage($msg)) {
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // 消息处理成功,发送ACK确认
    } else {
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); // 消息处理失败,发送NACK拒绝
    }
};

$channel->basic_consume('queue_name', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}

function processMessage(AMQPMessage $msg) {
    // 消息处理逻辑
    if ($msg->body == 'Hello World!') {
        return true;
    } else {
        return false;
    }
}

$channel->close();
$connection->close();

3. Methode zur Nachrichtenwiederholungsverarbeitung
In tatsächlichen Anwendungen kann die Nachrichtenverarbeitung fehlschlagen, z. B. Netzwerkfehler, Serverfehler , usw. Um die Zuverlässigkeit von Nachrichten sicherzustellen, können Nachrichten, deren Verarbeitung fehlschlägt, erneut versucht werden.

MySQL bietet Transaktions- und Rollback-Mechanismen, die bei der Nachrichtenwiederholungsverarbeitung angewendet werden können. Die spezifischen Implementierungsschritte lauten wie folgt:

  1. Der Produzent sendet Nachrichten an die Nachrichtentabelle in der Datenbank.
  2. Der Verbraucher entnimmt die Nachricht zur Verarbeitung aus der Nachrichtentabelle in der Datenbank.
  3. Wenn die Nachricht erfolgreich verarbeitet wurde, löschen Sie die Nachricht aus der Nachrichtentabelle. Andernfalls erhöhen Sie die Anzahl der Verarbeitungszeiten in der Nachrichtentabelle um eins und aktualisieren Sie die Verarbeitungszeit.
  4. Richten Sie eine geplante Aufgabe ein, um regelmäßig die Nachrichten in der Nachrichtentabelle zu überprüfen, die weniger als oder gleich der maximalen Anzahl von Wiederholungsversuchen verarbeitet wurden, und sie erneut an den Verbraucher zuzustellen.

Das Folgende ist ein Beispielcode, der MySQL als Nachrichtenspeicher verwendet:
Produzent:

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'INSERT INTO message_queue (message) VALUES (?)';
    $stmt = $db->prepare($sql);
    $message = 'Hello World!';
    $stmt->bindParam(1, $message);
    $stmt->execute();

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

Verbraucher:

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'SELECT * FROM message_queue';
    $stmt = $db->prepare($sql);
    $stmt->execute();
    $messages = $stmt->fetchAll();

    foreach ($messages as $message) {
        if (processMessage($message)) {
            $deleteSql = 'DELETE FROM message_queue WHERE id = ?';
            $deleteStmt = $db->prepare($deleteSql);
            $deleteStmt->bindParam(1, $message['id']);
            $deleteStmt->execute();
        } else {
            $retrySql = 'UPDATE message_queue SET retries = retries + 1, last_retry_time = ? WHERE id = ?';
            $retryStmt = $db->prepare($retrySql);
            $now = date('Y-m-d H:i:s');
            $retryStmt->bindParam(1, $now);
            $retryStmt->bindParam(2, $message['id']);
            $retryStmt->execute();
        }
    }

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

function processMessage($message) {
    // 消息处理逻辑
    if ($message['message'] == 'Hello World!') {
        return true;
    } else {
        return false;
    }
}

Aufgabe planen:

<?php
$dsn = 'mysql:dbname=testdb;host=127.0.0.1';
$user = 'root';
$password = '';

try {
    $db = new PDO($dsn, $user, $password);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

    $sql = 'SELECT * FROM message_queue WHERE retries <= ?';
    $stmt = $db->prepare($sql);
    $maxRetries = 3;
    $stmt->bindParam(1, $maxRetries);
    $stmt->execute();
    $messages = $stmt->fetchAll();

    foreach ($messages as $message) {
        // 重新投递消息给消费者
    }

} catch (PDOException $e) {
    echo 'Connection failed: ' . $e->getMessage();
}

Schlussfolgerung:
Durch den Nachrichtenbestätigungsmechanismus und die Nachrichtenwiederholungsverarbeitungsmethode können wir es verbessern die Zuverlässigkeit und Stabilität des Systems. Als gängiges Entkopplungs- und asynchrones Verarbeitungstool können Warteschlangen die Nachrichtenbestätigung und Wiederholung in PHP und MySQL effektiv implementieren und so eine bessere Leistung und Benutzererfahrung für unsere Anwendungen bieten.

Referenzen:

  1. Offizielle Dokumentation zu PHP RabbitMQ: https://github.com/php-amqplib/php-amqplib
  2. Offizielle Dokumentation zu PHP MySQL: https://www.php.net/manual/en/book . pdo.php

Das obige ist der detaillierte Inhalt vonWarteschlangennachrichtenbestätigungsmechanismus und Nachrichtenwiederholungsverarbeitungsmethode in PHP und MySQL. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn