Home  >  Article  >  PHP Framework  >  Let’s talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

Let’s talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

WBOY
WBOYforward
2022-04-20 13:07:4810058browse

This article brings you relevant knowledge about thinkphp, which mainly introduces the relevant content about using think-queue to implement ordinary queues and delayed queues. think-queue is officially provided by thinkphp A message queue service, let’s take a look at it, I hope it will be helpful to everyone.

Let’s talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

Recommended learning: "PHP Video Tutorial"

TP6 Queue

TP6 Think-queue can be used to implement ordinary queues and delayed queues.

think-queue is a message queue service officially provided by thinkphp. It supports some basic features of the message queue:
  • Message publishing, acquisition, execution, deletion, and resending, Failure handling, delayed execution, timeout control, etc.
  • Multiple queues of queues, memory limits, start, stop, guard, etc.
  • The message queue can be downgraded to synchronous execution

Message queue implementation process

1. Push messages to the message queue service through the producer

2. The message queue service stores the received messages into the redis queue (zset)

3. The consumer monitors the queue. When it hears a new message in the queue, it obtains the first message in the queue.

4. Processes the obtained message and calls the business class for processing. Related business

5. After business processing, messages need to be deleted from the queue

composer Install think-queue

composer require topthink/think-queue

Configuration file

After installing think-queue, queue.php will be generated in the config directory. This file is the configuration file of the queue.

tp6 provides a variety of message queue implementation methods. By default, sync is used. I choose to use Redis here.

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
Create directory and queue consumption files

Create the queue directory in the app directory, and then create a new abstract class Queue.php file in this directory as the basis Class

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}
All real consumer classes inherit the basic abstract class

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}
Producer logic

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
Start the process monitoring task and execute it

php think queue:listen
php think queue:work

Command mode introduction

Command mode
  • queue:work command

    work command: This command will start a work process to handle the message queue.
php think queue:work --queue TestQueue
  • queue:listen command

    listen command: This command will create a listen parent process, and then the parent process will pass proc_open('php think queue :work')

    to create a work sub-process to process the message queue and limit the execution time of the work process.
  • php think queue:listen --queue TestQueue

    Command line parameters
    • Work mode
    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen mode

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位
    You can see that in listen mode, the --deamon parameter is not included. The reason will be explained below
  • Start, stop and restart the message queue
    • Start a message queue:
    php think queue:work
  • Stop all message queues:
  • php think queue:restart
  • Restart all message queues :
  • php think queue:restart 
    php think queue:work

    Recommended learning: "PHP Video Tutorial
    "

    ###

    The above is the detailed content of Let’s talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:csdn.net. If there is any infringement, please contact admin@php.cn delete