首頁 >php框架 >ThinkPHP >一起聊聊thinkphp6使用think-queue實作普通佇列和延遲佇列

一起聊聊thinkphp6使用think-queue實作普通佇列和延遲佇列

WBOY
WBOY轉載
2022-04-20 13:07:4810328瀏覽

這篇文章為大家帶來了關於thinkphp的相關知識,其中主要介紹了關於使用think-queue來實現普通隊列和延遲隊列的相關內容,think-queue是thinkphp官方提供的一個訊息隊列服務,下面一起來看一下,希望對大家有幫助。

一起聊聊thinkphp6使用think-queue實作普通佇列和延遲佇列

推薦學習:《PHP影片教學

##TP6 佇列

##TP6中使用think-queue 可以實作普通佇列和延遲佇列。

think-queue 是thinkphp 官方提供的訊息佇列服務,它支援訊息佇列的一些基本特性:

    訊息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,逾時控制等
  • 佇列的多佇列, 記憶體限制,啟動,停止,守護等
  • 訊息佇列可降級為同步執行

訊息佇列實作過程

1、透過生產者推播訊息到訊息佇列服務中

2、訊息佇列服務將收到的訊息存入redis佇列中(zset)

3、消費者進行監聽佇列,當監聽到佇列有新的訊息時,取得佇列第一個

4、處理取得下來的訊息呼叫業務類別進行處理相關業務

5、業務處理後,需要從佇列中刪除訊息

composer 安裝think-queue
composer require topthink/think-queue

設定檔

#安裝完think-queue 後會在config 目錄中產生queue.php,這個檔案是佇列的設定檔。

tp6中提供了多種訊息佇列的實作方式,預設使用sync,我在這裡選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

建立目錄及佇列消費性文件

在app 目錄下建立queue 目錄,然後在該目錄下新建一個抽象類別Queue.php 文件,作為基礎類別

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}
所有真正的消費類別繼承基礎抽象類別

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}

#生產者邏輯
use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);

開啟進程監聽任務並執行
php think queue:listen
php think queue:work

指令模式介紹

指令模式

  • #queue:work 指令

    work 指令: 指令將啟動一個work 進程來處理訊息佇列。

    php think queue:work --queue TestQueue
  • queue:listen 指令

    listen 指令: 此指令將會建立一個listen 父行程,然後由父行程透過

    proc_open('php think queue :work') 的方式來建立一個work 子程序來處理訊息佇列,並且限制該work程序的執行時間。

    php think queue:listen --queue TestQueue
命令列參數

  • Work 模式

    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位
    可以看到listen 模式下,不包含

    --deamon 參數,原因下面會說明

  • 訊息佇列的開始,停止與重啟

    • 開始一個訊息佇列:

      php think queue:work
    • 停止所有的訊息佇列:

      php think queue:restart
    • 重啟所有的訊息佇列:

      php think queue:restart 
      php think queue:work
推薦學習:《

PHP影片教學

以上是一起聊聊thinkphp6使用think-queue實作普通佇列和延遲佇列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:csdn.net。如有侵權,請聯絡admin@php.cn刪除