>PHP 프레임워크 >ThinkPHP >일반 대기열과 지연 대기열을 구현하기 위해 think-queue를 사용하는 thinkphp6에 대해 이야기해 보겠습니다.

일반 대기열과 지연 대기열을 구현하기 위해 think-queue를 사용하는 thinkphp6에 대해 이야기해 보겠습니다.

WBOY
WBOY앞으로
2022-04-20 13:07:4810313검색

이 글은 thinkphp에 대한 관련 지식을 제공하며, think-queue를 사용하여 일반 큐와 지연된 큐를 구현하는 방법에 대한 내용을 주로 소개합니다. think-queue는 thinkphp에서 공식적으로 제공하는 메시지 큐 서비스입니다. 함께, 모두에게 도움이 되기를 바랍니다.

일반 대기열과 지연 대기열을 구현하기 위해 think-queue를 사용하는 thinkphp6에 대해 이야기해 보겠습니다.

추천 학습: "PHP 비디오 튜토리얼"

###TP6 Queue

TP6에서 think-queue를 사용하여 일반 대기열과 지연 대기열을 구현합니다.

think-queue는 thinkphp에서 공식적으로 제공하는 메시지 대기열 서비스입니다. 메시지 대기열의 몇 가지 기본 기능을 지원합니다:

  • 메시지 게시, 획득, 실행, 삭제, 재전송, 실패 처리, 지연된 실행, 시간 초과 제어 등
  • 큐의 다중 큐, 메모리 제한, 시작, 중지, 가드 등
  • 메시지 큐는 동기 실행으로 다운그레이드될 수 있습니다.

메시지 큐 구현 프로세스

1. 생성자를 통해 메시지 큐 서비스에 메시지를 푸시합니다.

2 메시지 큐 서비스는 수신된 메시지를 redis 큐(zset)에 저장합니다.

3. 소비자는 큐에 새 메시지가 있으면 큐의 첫 번째 메시지를 가져옵니다. . 획득한 메시지를 처리합니다. 메시지는 관련 비즈니스를 처리하기 위해 비즈니스 클래스를 호출합니다

5. 비즈니스 처리 후 해당 메시지를 대기열에서 삭제해야 합니다

composer think-queue 설치

composer require topthink/think-queue
구성 파일

설치 후 think-queue는 queue.php 구성 디렉터리에 생성되며, 이 파일은 대기열의 구성 파일입니다.

tp6은 다양한 메시지 대기열 구현 방법을 제공합니다. 여기서는 기본적으로 동기화가 사용됩니다.

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

디렉토리 및 대기열 소비 파일 생성

앱 디렉터리에 대기열 디렉터리를 생성한 다음 이 디렉터리에 기본 클래스로 새 추상 클래스 Queue.php 파일을 생성합니다.

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}

모든 실제 소비자 클래스는 기본 클래스를 상속합니다. 추상화 클래스

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}

Producer logic

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);

프로세스 모니터링 작업을 시작하고

php think queue:listen
php think queue:work
명령 모드 소개

명령 모드

    queue:work command
  • work 명령: 이 명령은 작업을 시작합니다. 메시지 큐를 처리하는 프로세스입니다.

    php think queue:work --queue TestQueue

  • queue:listen 명령
  • listen 명령: 이 명령은 수신 대기 상위 프로세스를 생성하며, 이 프로세스는 proc_open('php think queue:work')를 통해 상위 프로세스에 의해 생성됩니다. 작업 하위 프로세스는 메시지 큐를 처리하고 작업 프로세스의 실행 시간을 제한합니다.

    php think queue:listen --queue TestQueue

    proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

命令行参数

  • Work 模式

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位
  • Listen 模式

    php think queue:work

    可以看到 listen 模式下,不包含 --deamon

    명령줄 매개변수
  • Work 모드
      php think queue:restart
    • Listen mode
    php think queue:restart 
    php think queue:work
  • Listen 모드에서는 --deamon 매개변수가 포함되지 않은 것을 볼 수 있습니다. 아래에 설명되어 있습니다

  • 메시지 대기열 시작, 중지 및 다시 시작

  • 메시지 대기열 시작: rrreee

    모든 메시지 대기열 중지: rrreee

    🎜모든 메시지 대기열 다시 시작: 🎜 🎜추천 학습: "🎜PHP 비디오 튜토리얼🎜"🎜🎜

    위 내용은 일반 대기열과 지연 대기열을 구현하기 위해 think-queue를 사용하는 thinkphp6에 대해 이야기해 보겠습니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 csdn.net에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제