Maison  >  Article  >  développement back-end  >  PHP implémente simplement l'opération de retard

PHP implémente simplement l'opération de retard

coldplay.xixi
coldplay.xixiavant
2020-07-03 17:41:245619parcourir

PHP implémente simplement l'opération de retard


Scénario

Parfois en affaires Vous rencontrera des opérations retardées, comme l'annulation de la commande si le paiement n'est pas effectué une demi-heure après la passation de la commande, l'envoi d'un SMS de rappel si le paiement n'est pas effectué quinze minutes après la passation de la commande, etc. Alors comment répondre à une telle demande ?

Recommandations d'apprentissage associées : Programmation PHP de l'entrée à la maîtrise

Méthodes de mise en œuvre

  • La première manière simple consiste à utiliser un processus en arrière-plan pour vérifier la commande dans une boucle sans fin, et à effectuer différentes opérations en fonction de l'heure de la commande
  • La seconde consiste à utiliser le message programmé de la file d'attente des messages pour envoyer le message programmé après la passation de la commande, différentes files d'attente de synchronisation gèrent une logique différente
  • La troisième méthode peut être utilisée en utilisant certaines fonctions existantes fournies par le framework

Code d'implémentation

Nous avons utilisé le scénario d'envoi d'un email à l'utilisateur si la commande n'était pas payée 15 minutes après la création de la commande

<.>Préparation :

    Formulaire de commande simple : commande
  1. Divers packages de composition requis
  2. Service local RabbitMq
  3. Ouvrir le service Alibaba Cloud RocketMq

Le premier type

    La logique du code est très simple, il suffit de boucler dans une boucle infinie
  • Démarrez ce processus de script et configurez-le avec le superviseur
  • Partie du code
  • //创建订单的逻辑/**
     * 随机创建订单
     */$order = [
        &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
        &#39;user_id&#39; => mt_rand(1, 100),
        &#39;order_amount&#39; => mt_rand(100, 1000),];
        /**@var $manager Illuminate\Database\Capsule\Manager **/
        $conn = $manager;$insertResult = $conn::table("order")
        ->insert($order);print_r($insertResult);
Logique de traitement retardé

while(true) {
    // 未支付订单列表
    $orderList = $conn::table("order")
        ->where("created_time",  &#39;<=&#39;, date("Y-m-d H:i:s", strtotime("-15 minutes")))
        ->where(&#39;sended_need_pay_notify&#39;, &#39;=&#39;, 2)
        ->where(&#39;status&#39;, &#39;=&#39;, 1)
        ->select([&#39;user_id&#39;, &#39;id&#39;])
        ->orderBy("id", &#39;asc&#39;)
        ->get();
    $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
        sendEmail($orderInfo[&#39;user_id&#39;]);
        $conn::table(&#39;order&#39;)
            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
            ->update([&#39;sended_need_pay_notify&#39; => 1]);
        logs("update-success-orderId-". $orderInfo[&#39;id&#39;]."-userId-".$orderInfo[&#39;user_id&#39;]);
    }

    sleep(10);}

Exécution du script de traitement

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
send email to 73 success ...
2020-06-24 11:37:36:update-success-orderId-3-userId-73

Cette méthode est simple à mettre en œuvre, mais pas élégante, et elle permet de commander de grandes quantités en même temps. Des problèmes seront également rencontrés.

Le deuxième type

    Par exemple, en utilisant le service MQ d'Alibaba Cloud, les versions actuelles de rocketMq et RabbitMq prennent en charge les messages retardés, mais les frais de message retardé de Rabbit sont trop élevés
  • Ici, nous utilisons d'abord le message retardé de rocketMq pour implémenter
  • Vous devez activer les services Alibaba Cloud
  • // 创建订单的逻辑try
            {
    
                /**
                 * 随机创建订单
                 */
                $order = [
                    &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
                    &#39;user_id&#39; => mt_rand(1, 100),
                    &#39;order_amount&#39; => mt_rand(100, 1000),
                ];
    
                /**@var $manager Illuminate\Database\Capsule\Manager **/
                $conn = $manager;
    
                $insertId = $conn::table("order")
                    ->insertGetId($order);
    
                $body = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
                $publishMessage = new TopicMessage(
                    $body            );
                // 设置消息KEY
                $publishMessage->setMessageKey("MessageKey");
    
                // 定时消息, 定时时间为3分钟后
                $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);
    
                $result = $this->producer->publishMessage($publishMessage);
    
                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
                -
                >getMessageBodyMD5() . "\n";
            } catch (\Exception $e) {
                print_r($e->getMessage() . "\n");
            }
La logique de consommation est consommant également le processeur

foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();

                $messageBody = $message->getMessageBody();

                $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo[&#39;order_id&#39;])) {
                    $orderId = $orderInfo[&#39;order_id&#39;];

                    /**@var $manager Illuminate\Database\Capsule\Manager * */
                    $conn = $manager;
                    $orderInfo = $conn::table("order")
                        ->select([&#39;id&#39;, &#39;user_id&#39;])
                        ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
                        ->where(&#39;status&#39;, &#39;=&#39;, 1)
                        ->first();
                    if (!empty($orderInfo)) {
                        $orderInfo = json_decode(json_encode($orderInfo), true);
                        sendEmail($orderInfo[&#39;user_id&#39;]);
                        $conn::table(&#39;order&#39;)
                            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                            ->update([&#39;sended_need_pay_notify&#39; => 1]);
                        logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . 
                        "-userId-" . $orderInfo[&#39;user_id&#39;]);
                    }
                }
            }

Commencer à produire un message

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
is:63448B50AA7B8AF47B07AA7CE807E3D3
gaoz@nobodyMBP delay_mq_demo %

Démarrer le consommateur et attendre lentement

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
No message, contine long polling!RequestId:5EF752583441411C74869BA9
No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
No message, contine long polling!RequestId:5EF752613441411C7486A7D9
consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
 Array(
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
    ack

Cette méthode peut utiliser les services existants et réduire le temps de développement

Le troisième type Utilisez RabbitMq pour implémenter

    La vérification de la documentation n'a pas trouvé la fonction native de RabbitMq de prise en charge de la file d'attente de retard, mais elle peut être obtenue via le ttl+ du message Implémentation de la file d'attente des lettres mortes
  • La file d'attente des messages privés est une file d'attente utilisée pour stocker les messages qui n'ont pas été consommés ou qui n'ont pas été consommés
  • Lorsque le message n'est pas consommé pendant la période de validité du message défini, il sera transmis à la file d'attente des lettres mortes
  • Réaliser la fonction de retard en définissant la période de validité du message
  • // 生产者$exchange = &#39;order15min_notify_exchange&#39;;
    $queue = &#39;order15minx_notify_queue&#39;;$dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set(&#39;x-message-ttl&#39;, 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
     * 随机创建订单
     */$order = [
        &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
        &#39;user_id&#39; => mt_rand(1, 100),
        &#39;order_amount&#39; => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order")
        ->insertGetId($order);$messageBody = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
        $message = new AMQPMessage($messageBody, array(&#39;content_type&#39; => &#39;text/plain&#39;, &#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, $exchange);
Consommateur

$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();
$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */function process_message($message){
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $orderInfo = json_decode($message->body, true);
    if (!empty($orderInfo[&#39;order_id&#39;])) {
        $orderId = $orderInfo[&#39;order_id&#39;];

        /**@var $conn Illuminate\Database\Capsule\Manager * */
        $conn = getdb();
        $orderInfo = $conn::table("order")
            ->select([&#39;id&#39;, &#39;user_id&#39;])
            ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
            ->where(&#39;status&#39;, &#39;=&#39;, 1)
            ->first();
        if (!empty($orderInfo)) {
            $orderInfo = json_decode(json_encode($orderInfo), true);
            sendEmail($orderInfo[&#39;user_id&#39;]);
            $conn::table(&#39;order&#39;)
                ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                ->update([&#39;sended_need_pay_notify&#39; => 1]);
            logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]);
        }

    }
    $message->delivery_info[&#39;channel&#39;]->basic_ack(
        $message->delivery_info[&#39;delivery_tag&#39;]);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, &#39;process_message&#39;);

Démarrer le consommateur

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
--------
{"order_id":7,"created_time":"2020-06-27 11:50:08"}
--------
send email to 2 success ...
2020-06-27 11:56:55:update-success-orderId-7-userId-2
Démarrer respectivement le consommateur et la production Vous pouvez voir le flux des messages ici

Le message entre d'abord dans la file d'attente normale, puis entre dans la file d'attente des lettres mortes après l'expiration et est consommé

La quatrième méthode

    Utilisez la propre file d'attente de Laravel pour implémenter
  • Le code détaillé n'est pas organisé ici et sera mis à jour ultérieurement. Sortez
  • pour consulter la file d'attente des documents officiels "Laravel 5.7 Chinese Documentation"
Exemple de code : github.com/nobody05/delay_mq_demo

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer