Rumah >rangka kerja php >ThinkPHP >Mari kita bincangkan tentang thinkphp6 menggunakan think-queue untuk melaksanakan baris gilir biasa dan baris gilir tertunda

Mari kita bincangkan tentang thinkphp6 menggunakan think-queue untuk melaksanakan baris gilir biasa dan baris gilir tertunda

WBOY
WBOYke hadapan
2022-04-20 13:07:4810313semak imbas

Artikel ini membawa anda pengetahuan yang berkaitan tentang thinkphp, yang terutamanya memperkenalkan kandungan yang berkaitan tentang penggunaan think-queue untuk melaksanakan baris gilir biasa dan gilir tertunda disediakan secara rasmi oleh thinkphp Perkhidmatan baris gilir mesej, mari kita lihat, semoga bermanfaat untuk semua.

Mari kita bincangkan tentang thinkphp6 menggunakan think-queue untuk melaksanakan baris gilir biasa dan baris gilir tertunda

Disyorkan pembelajaran: "Tutorial Video PHP"

###TP6 Queue

TP6 Think-queue boleh digunakan untuk melaksanakan baris gilir biasa dan baris gilir tertunda.

think-queue ialah perkhidmatan baris gilir mesej yang disediakan secara rasmi oleh thinkphp Ia menyokong beberapa ciri asas baris gilir mesej:

  • Penerbitan mesej, pemerolehan, pelaksanaan, pemadaman, penghantaran semula, Pengendalian kegagalan. , pelaksanaan tertunda, kawalan tamat masa, dsb.
  • Baris gilir berbilang, had memori, mula, henti, jaga, dsb.
  • Baris gilir mesej boleh diturunkan taraf kepada pelaksanaan segerak

Proses pelaksanaan baris gilir mesej

1 Tolak mesej ke perkhidmatan baris gilir mesej melalui pengeluar

2 redis queue (zset)

3 Pengguna memantau baris gilir apabila ia mendengar mesej baharu dalam baris gilir, ia mendapat yang pertama dalam baris gilir

4 memanggil kelas perniagaan untuk diproses. 🎜>

fail konfigurasi

Selepas memasang think-queue, queue.php akan dijana dalam direktori konfigurasi.

TP6 menyediakan pelbagai kaedah pelaksanaan baris gilir mesej Secara lalai, penyegerakan digunakan di sini.

composer require topthink/think-queue

Buat direktori dan fail penggunaan baris gilir

Buat direktori baris gilir dalam direktori aplikasi, dan kemudian buat fail Queue.php kelas abstrak baharu dalam sebagai kelas asas

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
semua kelas pengguna sebenar mewarisi kelas abstrak asas

logik pengeluar

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}

Dayakan tugas pemantauan proses dan laksanakan
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}

Pengenalan mod arahan

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
Mod arahan

baris:perintah kerja
php think queue:listen
php think queue:work

arahan kerja: Perintah ini akan memulakan proses kerja untuk memproses baris gilir mesej.

  • perintah gilir:dengar

    perintah dengar: Perintah ini akan mencipta proses induk dengar, yang kemudiannya akan dibuat oleh proses induk melalui

    Proses anak kerja mengendalikan baris gilir mesej dan mengehadkan masa pelaksanaan proses kerja.

    php think queue:work --queue TestQueue
  • Parameter baris perintah

    proc_open(‘php think queue:work’)

    Mod kerja
    php think queue:listen --queue TestQueue

Mod dengar
  • Anda boleh melihat bahawa dalam mod dengar, parameter

    tidak disertakan Sebabnya akan diterangkan di bawah
    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Mula, berhenti dan mulakan semula. baris gilir mesej

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

    --deamonMulakan baris gilir mesej:

  • Hentikan semua baris gilir mesej:
    • php think queue:work
      Mulakan Semula Semua baris gilir mesej:
    • php think queue:restart
    • Pembelajaran yang disyorkan: "

      Tutorial Video PHP

      "
      php think queue:restart 
      php think queue:work

Atas ialah kandungan terperinci Mari kita bincangkan tentang thinkphp6 menggunakan think-queue untuk melaksanakan baris gilir biasa dan baris gilir tertunda. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:csdn.net. Jika ada pelanggaran, sila hubungi admin@php.cn Padam