首頁 >php框架 >ThinkPHP >解析think-queue(圍繞redis做分析)

解析think-queue(圍繞redis做分析)

藏色散人
藏色散人轉載
2021-07-26 16:00:194091瀏覽

前言

分析前請大家務必了解訊息佇列的實作

tp5的訊息佇列是基於database redis 和tp官方自己實作的Topthink
本章是圍繞著redis來做分析

儲存key:

##key類型描述list要執行的任務#string重啟佇列時間戳zSet延遲任務zSet執行失敗,等待重新執行##執行指令
#queues:queueName
think:queue:restart
queues:queueName:delayed
queues :queueName:reserved

work和listen的差異在下面會解釋

指令
描述
##php think queue:work 監聽佇列
php think queue:listen 監聽佇列
php think queue:restart #重啟佇列
php think queue:subscribe 暫無,可能是保留的官方有什麼其他想法但是還沒實現

行為標籤

#重啟守護程式任務開始執行之前任務延遲執行任務執行失敗
#標籤 描述
#worker_daemon_start 守護程式開啟
worker_memory_exceeded #記憶體超出
##worker_queue_restart
worker_before_process
worker_before_sleep
queue_failed
指令參數

參數預設值可以使用的模式#queuedaemon##delayforce memorysleeptries
##描述
null work,listen 要執行的任務名稱
null work #以守護程式執行任務
0 work,listen 失敗後重新執行的時間
null work 失敗後重新執行的時間
128M work,listen #限制最大記憶體
#3 work,listen 沒有任務的時候等待的時間
#### ##0######work,listen######任務失敗後最大嘗試次數############

模式差異

1: 執行原理不同
work: 單一進程的處理模式;
無 daemon 參數 work進程在處理下一個訊息後直接結束目前程序。當不存在新訊息時,會sleep一段時間然後退出;
有 daemon 參數 work進程會循環地處理佇列中的消息,直到記憶體超出參數配置才結束進程。當不存在新訊息時,會在每次循環中sleep一段時間;

listen: 父進程子進程的處理模式;
會在所在的父進程會建立一個單次執行模式的work子程序,並透過該work子程序來處理佇列中的下一個訊息,當這個work子程序退出之後;
所在的父行程會監聽到該子程序的退出訊號,並重新建立一個新的單次執行的work子程序;

2: 退出時機不同
work: 看上面
listen: 所在的父進程正常情況會一直運行,除非遇到下面兩種情況
01: 所建立的某個work子程序的執行時間超過了listen命令列中的--timeout 參數配置;此時work子程序會被強制結束,listen所在的父程序也會拋出一個ProcessTimeoutException 異常並退出;

開發者可以選擇捕獲該異常,讓父進程繼續執行;
02: 所在的父進程因某種原因存在內存洩露,則當父進程本身佔用的內存超過了命令行中的--memory 參數配置時,父子程序均會退出。正常情況下,listen進程本身所佔用的記憶體是穩定不變的。

3: 效能不同
work: 是在腳本內部做循環,框架腳本在指令執行的初期就已載入完畢;

listen: 是處理完一個任務之後新開一個work進程,此時會重新載入框架腳本;

因此work 模式的效能會比listen模式高。
注意: 當程式碼有更新時,work 模式下需要手動去執行 php think queue:restart 指令重新啟動佇列來使變更生效;而listen 模式會自動生效,無需其他操作。

4: 逾時控制能力
work: 本質上既不能控制進程本身的運行時間,也無法限制執行中的任務的執行時間;
listen: 可以限制其創建的work子進程的逾時時間;

可透過timeout 參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;
expire 和time的區別

expire 在設定檔中設定,指任務的過期時間這個時間是全域的,影響到所有的work行程
timeout 在命令列參數中設定,指work子程序的逾時時間,這個時間只對目前執行的listen 指令有效,timeout 針對的物件是work 子程序;

5: 使用場景不同

work 適用場景是:
01: 任務數量較多
02: 效能要求較高
03: 任務的執行時間較短
04: 消費者類別中不存在死循環,sleep() ,exit() ,die() 等容易導致bug的邏輯

listen 適用場景是:

01: 任務數量較少
02: 任務的執行時間較長
03: 任務的執行時間需要有嚴格限制

#公有運算

由於我們是根據redis來做分析所以只需要分析src/queue/connector/redis.php
01: 先呼叫src/Queue.php中的魔術方法__callStatic
02: 在__callStatic方法中呼叫了buildConnector
03: buildConnector 中先載入設定檔如果無將是同步執行
04: 根據設定檔去建立連線並且傳入設定

在redis.php類別的建構方法中的操作:
01: 偵測redis擴充是否安裝
02: 合併設定
03: 偵測是redis擴充還是pRedis
04: 建立連線物件

#發佈流程

發布參數

#參數名稱 預設值 描述 可以使用的方法
$job 要執行任務的類別 push,later
#$data 任務資料 push,later
$queue default 任務名稱 push,later
$delay null 延遲時間 later

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

解析think-queue(圍繞redis做分析)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

以上是解析think-queue(圍繞redis做分析)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除