Home  >  Article  >  Backend Development  >  PHP simply implements delay operation

PHP simply implements delay operation

coldplay.xixi
coldplay.xixiforward
2020-07-03 17:41:245571browse

PHP simply implements delay operation


## Scenario

Sometimes in business You will encounter delayed operations, such as canceling the order if payment is not made half an hour after placing the order, sending a text message reminder if payment is not made fifteen minutes after placing the order, etc. So how to realize such a demand?

Related learning recommendations:

PHP programming from entry to proficiency

Implementation method

    The first simple way is to use a background process to check the order in an endless loop, and perform different operations according to the order time
  • The second is to use the scheduled message of the message queue, and send the scheduled message after the order is placed. , different timing queues handle different logic
  • The third method can be done using some existing functions provided by the framework

Implementation code

We use the scenario of sending an email to the user after the order was not paid for 15 minutes after the order was created.

Preparation work:

    Simple order form: order
  1. Various needed composer packages
  2. rabbitMq local service
  3. Open Alibaba Cloud RocketMq service

The first one

    The code logic is very simple, just go into an infinite loop
  • To start this script process, you can configure it with supervisor
  • Part of the code
  • //创建订单的逻辑/**
     * 随机创建订单
     */$order = [
        'order_number' => mt_rand(100,10000).date("YmdHis"),
        'user_id' => mt_rand(1, 100),
        'order_amount' => mt_rand(100, 1000),];
        /**@var $manager Illuminate\Database\Capsule\Manager **/
        $conn = $manager;$insertResult = $conn::table("order")
        ->insert($order);print_r($insertResult);
Delay processing logic

while(true) {
    // 未支付订单列表
    $orderList = $conn::table("order")
        ->where("created_time",  &#39;<=&#39;, date("Y-m-d H:i:s", strtotime("-15 minutes")))
        ->where(&#39;sended_need_pay_notify&#39;, &#39;=&#39;, 2)
        ->where(&#39;status&#39;, &#39;=&#39;, 1)
        ->select([&#39;user_id&#39;, &#39;id&#39;])
        ->orderBy("id", &#39;asc&#39;)
        ->get();
    $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
        sendEmail($orderInfo[&#39;user_id&#39;]);
        $conn::table(&#39;order&#39;)
            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
            ->update([&#39;sended_need_pay_notify&#39; => 1]);
        logs("update-success-orderId-". $orderInfo[&#39;id&#39;]."-userId-".$orderInfo[&#39;user_id&#39;]);
    }

    sleep(10);}

Execution processing script

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
send email to 73 success ...
2020-06-24 11:37:36:update-success-orderId-3-userId-73

This method is simple to implement, but not elegant, and it can order large quantities at the same time There will also be problems encountered.

The second type

    For example, using Alibaba Cloud's MQ service, the current versions of rocketMq and rabbitMq support delayed messages, but rabbit's Delayed message charges are too high
  • Here we first use rocketMq’s delayed message to implement
  • Need to activate Alibaba Cloud services
  • // 创建订单的逻辑try
            {
    
                /**
                 * 随机创建订单
                 */
                $order = [
                    &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
                    &#39;user_id&#39; => mt_rand(1, 100),
                    &#39;order_amount&#39; => mt_rand(100, 1000),
                ];
    
                /**@var $manager Illuminate\Database\Capsule\Manager **/
                $conn = $manager;
    
                $insertId = $conn::table("order")
                    ->insertGetId($order);
    
                $body = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
                $publishMessage = new TopicMessage(
                    $body            );
                // 设置消息KEY
                $publishMessage->setMessageKey("MessageKey");
    
                // 定时消息, 定时时间为3分钟后
                $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);
    
                $result = $this->producer->publishMessage($publishMessage);
    
                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
                -
                >getMessageBodyMD5() . "\n";
            } catch (\Exception $e) {
                print_r($e->getMessage() . "\n");
            }
The consumption logic is also consuming Processing

foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();

                $messageBody = $message->getMessageBody();

                $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo[&#39;order_id&#39;])) {
                    $orderId = $orderInfo[&#39;order_id&#39;];

                    /**@var $manager Illuminate\Database\Capsule\Manager * */
                    $conn = $manager;
                    $orderInfo = $conn::table("order")
                        ->select([&#39;id&#39;, &#39;user_id&#39;])
                        ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
                        ->where(&#39;status&#39;, &#39;=&#39;, 1)
                        ->first();
                    if (!empty($orderInfo)) {
                        $orderInfo = json_decode(json_encode($orderInfo), true);
                        sendEmail($orderInfo[&#39;user_id&#39;]);
                        $conn::table(&#39;order&#39;)
                            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                            ->update([&#39;sended_need_pay_notify&#39; => 1]);
                        logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . 
                        "-userId-" . $orderInfo[&#39;user_id&#39;]);
                    }
                }
            }

Start producing a message

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
is:63448B50AA7B8AF47B07AA7CE807E3D3
gaoz@nobodyMBP delay_mq_demo %

Start the consumer and wait slowly

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
No message, contine long polling!RequestId:5EF752583441411C74869BA9
No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
No message, contine long polling!RequestId:5EF752613441411C7486A7D9
consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
 Array(
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
    ack

This method can be used by existing services and reduce development time

The third type Use rabbitMq to implement

    After consulting the documentation, I did not find rabbitMq’s native function of supporting delay queue, but it can be achieved through the ttl of the message Dead letter queue implementation
  • The private message queue is a queue used to store messages that have not been consumed or failed to consume.
  • When the message is not consumed within the validity period of the set message, it will be forwarded to the dead letter queue.
  • Achieve delay function by setting the validity period of the message
  • // 生产者$exchange = &#39;order15min_notify_exchange&#39;;
    $queue = &#39;order15minx_notify_queue&#39;;$dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set(&#39;x-message-ttl&#39;, 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
     * 随机创建订单
     */$order = [
        &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
        &#39;user_id&#39; => mt_rand(1, 100),
        &#39;order_amount&#39; => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order")
        ->insertGetId($order);$messageBody = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
        $message = new AMQPMessage($messageBody, array(&#39;content_type&#39; => &#39;text/plain&#39;, &#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, $exchange);
Consumer

$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();
$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */function process_message($message){
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $orderInfo = json_decode($message->body, true);
    if (!empty($orderInfo[&#39;order_id&#39;])) {
        $orderId = $orderInfo[&#39;order_id&#39;];

        /**@var $conn Illuminate\Database\Capsule\Manager * */
        $conn = getdb();
        $orderInfo = $conn::table("order")
            ->select([&#39;id&#39;, &#39;user_id&#39;])
            ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
            ->where(&#39;status&#39;, &#39;=&#39;, 1)
            ->first();
        if (!empty($orderInfo)) {
            $orderInfo = json_decode(json_encode($orderInfo), true);
            sendEmail($orderInfo[&#39;user_id&#39;]);
            $conn::table(&#39;order&#39;)
                ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                ->update([&#39;sended_need_pay_notify&#39; => 1]);
            logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]);
        }

    }
    $message->delivery_info[&#39;channel&#39;]->basic_ack(
        $message->delivery_info[&#39;delivery_tag&#39;]);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, &#39;process_message&#39;);

Start consumer

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
--------
{"order_id":7,"created_time":"2020-06-27 11:50:08"}
--------
send email to 2 success ...
2020-06-27 11:56:55:update-success-orderId-7-userId-2
Start consumer and production respectively That's it. You can see the flow of messages here

The message first enters the normal queue, and then enters the dead letter after expiration The queue is consumed

The fourth method

    Use laravel’s own Queue to implement
  • The detailed code is not organized here, and will be updated later Come out
  • You can view the official document queue "Laravel 5.7 Chinese Documentation"
Code example: github.com/nobody05/delay_mq_demo

The above is the detailed content of PHP simply implements delay operation. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:learnku.com. If there is any infringement, please contact admin@php.cn delete