搜尋
首頁php框架ThinkPHP解析think-queue(圍繞redis做分析)

解析think-queue(圍繞redis做分析)

Jul 26, 2021 pm 04:00 PM
phpredisthinkphp5

前言

分析前請大家務必了解訊息佇列的實作

tp5的訊息佇列是基於database redis 和tp官方自己實作的Topthink
本章是圍繞著redis來做分析

儲存key:

##key類型描述list要執行的任務#string重啟佇列時間戳zSet延遲任務zSet執行失敗,等待重新執行##執行指令
#queues:queueName
think:queue:restart
queues:queueName:delayed
queues :queueName:reserved

work和listen的差異在下面會解釋

指令
描述
##php think queue:work 監聽佇列
php think queue:listen 監聽佇列
php think queue:restart #重啟佇列
php think queue:subscribe 暫無,可能是保留的官方有什麼其他想法但是還沒實現

行為標籤

#重啟守護程式任務開始執行之前任務延遲執行任務執行失敗
#標籤 描述
#worker_daemon_start 守護程式開啟
worker_memory_exceeded #記憶體超出
##worker_queue_restart
worker_before_process
worker_before_sleep
queue_failed
指令參數

參數預設值可以使用的模式#queuedaemon##delayforce memorysleeptries
##描述
null work,listen 要執行的任務名稱
null work #以守護程式執行任務
0 work,listen 失敗後重新執行的時間
null work 失敗後重新執行的時間
128M work,listen #限制最大記憶體
#3 work,listen 沒有任務的時候等待的時間
#### ##0######work,listen######任務失敗後最大嘗試次數############

模式差異

1: 執行原理不同
work: 單一進程的處理模式;
無 daemon 參數 work進程在處理下一個訊息後直接結束目前程序。當不存在新訊息時,會sleep一段時間然後退出;
有 daemon 參數 work進程會循環地處理佇列中的消息,直到記憶體超出參數配置才結束進程。當不存在新訊息時,會在每次循環中sleep一段時間;

listen: 父進程子進程的處理模式;
會在所在的父進程會建立一個單次執行模式的work子程序,並透過該work子程序來處理佇列中的下一個訊息,當這個work子程序退出之後;
所在的父行程會監聽到該子程序的退出訊號,並重新建立一個新的單次執行的work子程序;

2: 退出時機不同
work: 看上面
listen: 所在的父進程正常情況會一直運行,除非遇到下面兩種情況
01: 所建立的某個work子程序的執行時間超過了listen命令列中的--timeout 參數配置;此時work子程序會被強制結束,listen所在的父程序也會拋出一個ProcessTimeoutException 異常並退出;

開發者可以選擇捕獲該異常,讓父進程繼續執行;
02: 所在的父進程因某種原因存在內存洩露,則當父進程本身佔用的內存超過了命令行中的--memory 參數配置時,父子程序均會退出。正常情況下,listen進程本身所佔用的記憶體是穩定不變的。

3: 效能不同
work: 是在腳本內部做循環,框架腳本在指令執行的初期就已載入完畢;

listen: 是處理完一個任務之後新開一個work進程,此時會重新載入框架腳本;

因此work 模式的效能會比listen模式高。
注意: 當程式碼有更新時,work 模式下需要手動去執行 php think queue:restart 指令重新啟動佇列來使變更生效;而listen 模式會自動生效,無需其他操作。

4: 逾時控制能力
work: 本質上既不能控制進程本身的運行時間,也無法限制執行中的任務的執行時間;
listen: 可以限制其創建的work子進程的逾時時間;

可透過timeout 參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;
expire 和time的區別

expire 在設定檔中設定,指任務的過期時間這個時間是全域的,影響到所有的work行程
timeout 在命令列參數中設定,指work子程序的逾時時間,這個時間只對目前執行的listen 指令有效,timeout 針對的物件是work 子程序;

5: 使用場景不同

work 適用場景是:
01: 任務數量較多
02: 效能要求較高
03: 任務的執行時間較短
04: 消費者類別中不存在死循環,sleep() ,exit() ,die() 等容易導致bug的邏輯

listen 適用場景是:

01: 任務數量較少
02: 任務的執行時間較長
03: 任務的執行時間需要有嚴格限制

#公有運算

由於我們是根據redis來做分析所以只需要分析src/queue/connector/redis.php
01: 先呼叫src/Queue.php中的魔術方法__callStatic
02: 在__callStatic方法中呼叫了buildConnector
03: buildConnector 中先載入設定檔如果無將是同步執行
04: 根據設定檔去建立連線並且傳入設定

在redis.php類別的建構方法中的操作:
01: 偵測redis擴充是否安裝
02: 合併設定
03: 偵測是redis擴充還是pRedis
04: 建立連線物件

#發佈流程

發布參數

#參數名稱 預設值 描述 可以使用的方法
$job 要執行任務的類別 push,later
#$data 任務資料 push,later
$queue default 任務名稱 push,later
$delay null 延遲時間 later

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

解析think-queue(圍繞redis做分析)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

以上是解析think-queue(圍繞redis做分析)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:segmentfault。如有侵權,請聯絡admin@php.cn刪除

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

WebStorm Mac版

WebStorm Mac版

好用的JavaScript開發工具

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器