>백엔드 개발 >PHP 튜토리얼 >PHP는 단순히 지연 작업을 구현합니다.

PHP는 단순히 지연 작업을 구현합니다.

coldplay.xixi
coldplay.xixi앞으로
2020-07-03 17:41:245691검색

PHP는 단순히 지연 작업을 구현합니다.


시나리오

때로는 주문 후 30분 이내에 결제가 이루어지지 않으면 주문을 취소하고, 결제가 완료되지 않으면 문자 메시지를 보내는 등 비즈니스 운영이 지연되는 경우가 있습니다. 주문 후 15분이 지나도 결제가 ​​되지 않습니다. 기다리세요. 그렇다면 그러한 요구를 어떻게 실현할 수 있을까요?

관련 학습 권장 사항: 초보부터 숙련까지 PHP 프로그래밍

구현 방법

  • 첫 번째 간단한 방법은 백그라운드 프로세스를 사용하여 무한 루프에서 순서를 확인하고 그에 따라 다른 작업을 수행하는 것입니다. 두 번째 작업
  • 은 메시지 대기열의 예약된 메시지를 사용하고 주문한 후 예약된 메시지를 보내는 것입니다. 서로 다른 예약된 대기열은 서로 다른 로직을 처리합니다.
  • 세 번째 방법은 에서 제공하는 일부 기존 기능을 사용하여 수행할 수 있습니다. Framework

구현 코드

주문이 생성된 후 15분 후에 사용자에게 결제 없이 이메일을 보내는 시나리오를 사용하여 학습했습니다

준비 작업:

  1. 간단한 주문 양식: 주문
  2. 다양한 요구에 맞는 Composer 패키지
  3. rabbitMq 로컬 서비스
  4. Alibaba Cloud RocketMq 서비스 열기

첫 번째 방법

  • 코드 로직은 매우 간단합니다. 무한 루프로 이동하세요.
  • 이 스크립트 프로세스 시작 , 관리자를 사용하여 구성할 수 있습니다
  • 코드의 일부
//创建订单的逻辑/**
 * 随机创建订单
 */$order = [
    'order_number' => mt_rand(100,10000).date("YmdHis"),
    'user_id' => mt_rand(1, 100),
    'order_amount' => mt_rand(100, 1000),];
    /**@var $manager Illuminate\Database\Capsule\Manager **/
    $conn = $manager;$insertResult = $conn::table("order")
    ->insert($order);print_r($insertResult);

지연된 처리 논리

while(true) {
    // 未支付订单列表
    $orderList = $conn::table("order")
        ->where("created_time",  &#39;<=&#39;, date("Y-m-d H:i:s", strtotime("-15 minutes")))
        ->where(&#39;sended_need_pay_notify&#39;, &#39;=&#39;, 2)
        ->where(&#39;status&#39;, &#39;=&#39;, 1)
        ->select([&#39;user_id&#39;, &#39;id&#39;])
        ->orderBy("id", &#39;asc&#39;)
        ->get();
    $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
        sendEmail($orderInfo[&#39;user_id&#39;]);
        $conn::table(&#39;order&#39;)
            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
            ->update([&#39;sended_need_pay_notify&#39; => 1]);
        logs("update-success-orderId-". $orderInfo[&#39;id&#39;]."-userId-".$orderInfo[&#39;user_id&#39;]);
    }

    sleep(10);}

실행 처리 스크립트

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
send email to 73 success ...
2020-06-24 11:37:36:update-success-orderId-3-userId-73

이 방법은 구현이 간단하지만 동시에 대규모 일괄 주문이 우아하지 않습니다. 문제도 겪게 됩니다.

두 번째 방법

  • 예를 들어 Alibaba Cloud의 MQ 서비스를 사용하면 현재 버전의 RocketMq 및 RabbitMq는 지연된 메시지를 지원하지만 Rabbit의 지연된 메시지는 너무 비쌉니다
  • 여기에서는 먼저 RocketMq의 지연된 메시지를 사용하여 구현합니다
  • Alibaba Cloud 서비스를 활성화해야 합니다
// 创建订单的逻辑try
        {

            /**
             * 随机创建订单
             */
            $order = [
                &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
                &#39;user_id&#39; => mt_rand(1, 100),
                &#39;order_amount&#39; => mt_rand(100, 1000),
            ];

            /**@var $manager Illuminate\Database\Capsule\Manager **/
            $conn = $manager;

            $insertId = $conn::table("order")
                ->insertGetId($order);

            $body = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
            $publishMessage = new TopicMessage(
                $body            );
            // 设置消息KEY
            $publishMessage->setMessageKey("MessageKey");

            // 定时消息, 定时时间为3分钟后
            $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);

            $result = $this->producer->publishMessage($publishMessage);

            print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
            -
            >getMessageBodyMD5() . "\n";
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }

소비 로직도 소비자에서 처리됩니다

foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();

                $messageBody = $message->getMessageBody();

                $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo[&#39;order_id&#39;])) {
                    $orderId = $orderInfo[&#39;order_id&#39;];

                    /**@var $manager Illuminate\Database\Capsule\Manager * */
                    $conn = $manager;
                    $orderInfo = $conn::table("order")
                        ->select([&#39;id&#39;, &#39;user_id&#39;])
                        ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
                        ->where(&#39;status&#39;, &#39;=&#39;, 1)
                        ->first();
                    if (!empty($orderInfo)) {
                        $orderInfo = json_decode(json_encode($orderInfo), true);
                        sendEmail($orderInfo[&#39;user_id&#39;]);
                        $conn::table(&#39;order&#39;)
                            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                            ->update([&#39;sended_need_pay_notify&#39; => 1]);
                        logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . 
                        "-userId-" . $orderInfo[&#39;user_id&#39;]);
                    }
                }
            }

메시지 생성 시작

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
is:63448B50AA7B8AF47B07AA7CE807E3D3
gaoz@nobodyMBP delay_mq_demo %

소비자를 시작하고 천천히 기다리세요

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
No message, contine long polling!RequestId:5EF752583441411C74869BA9
No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
No message, contine long polling!RequestId:5EF752613441411C7486A7D9
consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
 Array(
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
    ack

이 방법은 기존 서비스에서 사용할 수 있습니다. 개발을 줄입니다. time

세 번째 RabbitMq를 사용하여 구현

  • 문서를 확인했는데 지연 대기열을 지원하는 RabbitMq의 기본 기능을 찾지 못했지만 메시지의 ttl + 배달 못한 편지 대기열을 통해 구현할 수 있습니다.
  • 비공개 메시지 큐는 저장에 사용됩니다. 소비되지 않았거나 소비에 실패한 메시지의 큐입니다.
  • 설정된 메시지의 유효 기간 내에 메시지가 소비되지 않으면 배달 못한 편지 큐로 전달됩니다.
  • 지연 기능
// 生产者$exchange = &#39;order15min_notify_exchange&#39;;
$queue = &#39;order15minx_notify_queue&#39;;$dlxExchange = "dlx_order15min_exchange";
$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set(&#39;x-message-ttl&#39;, 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * 随机创建订单
 */$order = [
    &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
    &#39;user_id&#39; => mt_rand(1, 100),
    &#39;order_amount&#39; => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order")
    ->insertGetId($order);$messageBody = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
    $message = new AMQPMessage($messageBody, array(&#39;content_type&#39; => &#39;text/plain&#39;, &#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
    $channel->basic_publish($message, $exchange);

Consumer

$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();
$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */function process_message($message){
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $orderInfo = json_decode($message->body, true);
    if (!empty($orderInfo[&#39;order_id&#39;])) {
        $orderId = $orderInfo[&#39;order_id&#39;];

        /**@var $conn Illuminate\Database\Capsule\Manager * */
        $conn = getdb();
        $orderInfo = $conn::table("order")
            ->select([&#39;id&#39;, &#39;user_id&#39;])
            ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
            ->where(&#39;status&#39;, &#39;=&#39;, 1)
            ->first();
        if (!empty($orderInfo)) {
            $orderInfo = json_decode(json_encode($orderInfo), true);
            sendEmail($orderInfo[&#39;user_id&#39;]);
            $conn::table(&#39;order&#39;)
                ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                ->update([&#39;sended_need_pay_notify&#39; => 1]);
            logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]);
        }

    }
    $message->delivery_info[&#39;channel&#39;]->basic_ack(
        $message->delivery_info[&#39;delivery_tag&#39;]);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, &#39;process_message&#39;);

소비자를 시작하고

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
--------
{"order_id":7,"created_time":"2020-06-27 11:50:08"}
--------
send email to 2 success ...
2020-06-27 11:56:55:update-success-orderId-7-userId-2

소비자와 생산자를 각각 시작하면 여기에서 메시지의 흐름을 볼 수 있습니다

메시지는 먼저 일반 큐에 들어가고 만료된 후에는 배달 못한 편지 큐에 들어가 소비됩니다.

네 번째 방법

  • 라라벨 자체 큐를 사용하여 구현
  • 자세한 코드는 여기에 컴파일되지 않습니다. 나중에 업데이트됨
  • 공식 문서 대기열 "Laravel 5.7 Chinese Documentation"

코드 샘플을 볼 수 있습니다: github.com/nobody05/delay_mq_demo

위 내용은 PHP는 단순히 지연 작업을 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 learnku.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제