Heim  >  Artikel  >  PHP-Framework  >  Lassen Sie uns über thinkphp6 sprechen, das think-queue verwendet, um gewöhnliche Warteschlangen und verzögerte Warteschlangen zu implementieren

Lassen Sie uns über thinkphp6 sprechen, das think-queue verwendet, um gewöhnliche Warteschlangen und verzögerte Warteschlangen zu implementieren

WBOY
WBOYnach vorne
2022-04-20 13:07:4810246Durchsuche

Dieser Artikel vermittelt Ihnen relevantes Wissen über thinkphp, der hauptsächlich die relevanten Inhalte zur Verwendung von Think-Queue zur Implementierung gewöhnlicher Warteschlangen und verzögerter Warteschlangen vorstellt. Think-Queue ist ein offiziell von thinkphp bereitgestellter Nachrichtenwarteschlangendienst Gemeinsam hoffe ich, dass es für alle hilfreich sein wird.

Lassen Sie uns über thinkphp6 sprechen, das think-queue verwendet, um gewöhnliche Warteschlangen und verzögerte Warteschlangen zu implementieren

Empfohlenes Lernen: „PHP-Video-Tutorial

###TP6-Warteschlange

Verwenden Sie Think-Queue in TP6, um normale Warteschlangen und verzögerte Warteschlangen zu implementieren.

think-queue ist ein Nachrichtenwarteschlangendienst, der offiziell von thinkphp bereitgestellt wird. Er unterstützt einige grundlegende Funktionen der Nachrichtenwarteschlange:

  • Nachrichtenveröffentlichung, Erfassung, Ausführung, Löschen, erneutes Senden, Fehlerbehandlung, verzögerte Ausführung, Zeitüberschreitungskontrolle usw.
  • Mehrere Warteschlangen, Speichergrenzen, Start, Stopp, Schutz usw.
  • Nachrichtenwarteschlange kann auf synchrone Ausführung herabgestuft werden

Message Queue-Implementierungsprozess

1. Push-Nachrichten an den Nachrichtenwarteschlangendienst über den Produzenten

2, der Nachrichtenwarteschlangendienst speichert die empfangenen Nachrichten in der Redis-Warteschlange (zset)

3 Der Verbraucher überwacht die Warteschlange, wenn sich eine neue Nachricht in der Warteschlange befindet

4 . Verarbeitet die erhaltene Nachricht. Die Nachricht ruft die Business-Klasse auf, um verwandte Geschäfte zu verarbeiten.

Composer muss die Think-Queue installieren think-queue, es wird im Konfigurationsverzeichnis queue.php generiert, diese Datei ist die Konfigurationsdatei der Warteschlange.

tp6 bietet verschiedene Implementierungsmethoden für Nachrichtenwarteschlangen. Standardmäßig wird die Synchronisierung verwendet. Ich wähle hier die Verwendung von Redis.
composer require topthink/think-queue

Erstellen Sie Verzeichnisse und Warteschlangenverbrauchsdateien.

Erstellen Sie das Warteschlangenverzeichnis im App-Verzeichnis und erstellen Sie dann in diesem Verzeichnis eine neue abstrakte Klassendatei Queue.php als Basisklasse.

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
Alle echten Verbraucherklassen erben die Basisklasse Abstraktionsklasse

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}
Produzentenlogik

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}

Starten Sie die Prozessüberwachungsaufgabe und führen Sie

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
Einführung in den Befehlsmodus

Befehlsmodus

queue:Arbeitsbefehl

Arbeitsbefehl: Dieser Befehl werde eine Arbeit beginnen Prozess zur Verarbeitung der Nachrichtenwarteschlange.

php think queue:listen
php think queue:work

  • queue:listen command

    listen command: Dieser Befehl erstellt einen übergeordneten Listen-Prozess, der dann vom übergeordneten Prozess über proc_open('php think queue:work') erstellt wird Ein untergeordneter Arbeitsprozess verwaltet die Nachrichtenwarteschlange und begrenzt die Ausführungszeit des Arbeitsprozesses.

    php think queue:work --queue TestQueue
  • Befehlszeilenparameter

    proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue TestQueue

命令行参数

  • Work 模式

    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

    可以看到 listen 模式下,不包含 --deamonArbeitsmodus

    php think queue:work
  • Listen-Modus

    php think queue:restart
      Sie können sehen, dass im Listen-Modus der Parameter --deamon nicht enthalten ist, der Grund wird unten erklärt
    • Nachrichtenwarteschlange starten, stoppen und neu starten
    • Eine Nachrichtenwarteschlange starten:
    • php think queue:restart 
      php think queue:work

    • Alle Nachrichtenwarteschlangen stoppen:
    rrreee

Alle Nachrichtenwarteschlangen neu starten:rrreee

🎜🎜Empfohlenes Lernen: „🎜PHP-Video-Tutorial🎜“🎜🎜

Das obige ist der detaillierte Inhalt vonLassen Sie uns über thinkphp6 sprechen, das think-queue verwendet, um gewöhnliche Warteschlangen und verzögerte Warteschlangen zu implementieren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:csdn.net. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen