Rumah >rangka kerja php >ThinkPHP >Analisa think-queue (analisis sekitar redis)

Analisa think-queue (analisis sekitar redis)

藏色散人
藏色散人ke hadapan
2021-07-26 16:00:194144semak imbas

Kata Pengantar

Sebelum menganalisis, sila pastikan anda memahami pelaksanaan baris gilir mesej

tp5 Baris gilir mesej adalah berdasarkan redis pangkalan data dan tp pelaksanaan rasmi Topthink
Bab ini berdasarkan analisis redis

Kunci storan:

key 类型 描述
queues:queueName list 要执行的任务
think:queue:restart string 重启队列时间戳
queues:queueName:delayed zSet 延迟任务
queues:queueName:reserved zSet 执行失败,等待重新执行

Laksanakan arahan

kerja Perbezaan dengan mendengar akan diterangkan di bawah
命令 描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现

Teg tingkah laku

标签 描述
worker_daemon_start 守护进程开启
worker_memory_exceeded 内存超出
worker_queue_restart 重启守护进程
worker_before_process 任务开始执行之前
worker_before_sleep 任务延迟执行
queue_failed 任务执行失败

Parameter arahan

参数 默认值 可以使用的模式 描述
queue null work,listen 要执行的任务名称
daemon null work 以守护进程执行任务
delay 0 work,listen 失败后重新执行的时间
force null work 失败后重新执行的时间
memory 128M work,listen 限制最大内存
sleep 3 work,listen 没有任务的时候等待的时间
tries 0 work,listen 任务失败后最大尝试次数

Perbezaan mod

1: Prinsip pelaksanaan yang berbeza
kerja: mod pemprosesan satu proses;
Tiada parameter daemon Proses kerja akan menamatkan proses semasa secara langsung selepas memproses mesej seterusnya. Apabila tiada mesej baharu, ia akan tidur untuk tempoh masa dan kemudian keluar;
Dengan parameter daemon, proses kerja akan memproses mesej dalam baris gilir secara kitaran sehingga memori melebihi konfigurasi parameter sebelum menamatkan proses. Apabila tiada mesej baharu, ia akan tidur untuk satu tempoh masa dalam setiap gelung; proses. Proses anak kerja akan memproses mesej seterusnya dalam baris gilir melalui proses anak kerja apabila proses anak kerja keluar, proses induk di mana

terletak akan mendengar isyarat keluar proses anak dan mencipta yang baharu. satu. Proses anak kerja dilaksanakan dalam satu masa;


2: Masa keluar adalah berbeza
kerja: lihat di atas

dengar: Proses induk akan sentiasa berjalan dalam keadaan biasa, melainkan dua yang berikut situasi dihadapi

01: Masa pelaksanaan proses anak kerja yang dibuat melebihi konfigurasi parameter tamat masa dalam baris arahan dengar pada masa ini, proses anak kerja akan ditamatkan secara paksa, dan proses induk di mana mendengar berada yang terletak juga akan membuang pengecualian dan keluar ProcessTimeoutException ;

Pembangun boleh memilih untuk menangkap pengecualian ini dan membiarkan proses induk terus dilaksanakan;
02: Proses induk mengalami kebocoran memori atas sebab tertentu, dan apabila memori yang diduduki oleh proses induk itu sendiri melebihi baris arahan Apabila parameter --memory dikonfigurasikan dalam, kedua-dua proses ibu bapa dan anak akan keluar. Dalam keadaan biasa, ingatan yang diduduki oleh proses mendengar itu sendiri adalah stabil.

3: Prestasi berbeza
kerja: Ia bergelung di dalam skrip, dan skrip rangka kerja dimuatkan pada peringkat awal pelaksanaan perintah;

dengar: Ia membuka yang baharu selepas diproses tugasan Proses kerja, skrip rangka kerja akan dimuat semula pada masa ini

Oleh itu, prestasi mod kerja akan lebih tinggi daripada mod dengar.

Nota: Apabila kod dikemas kini, anda perlu melaksanakan perintah php think queue:restart secara manual dalam mod kerja untuk memulakan semula baris gilir agar perubahan berkuat kuasa semasa dalam mod dengar, ia akan berkuat kuasa secara automatik tanpa yang lain operasi.

4: Keupayaan kawalan tamat masa
kerja: Pada dasarnya, ia tidak boleh mengawal masa berjalan proses itu sendiri mahupun mengehadkan masa pelaksanaan tugasan pelaksanaan

mendengar: boleh mengehadkan kerja kanak-kanak ia mencipta Masa tamat masa proses;


Masa maksimum subproses kerja dibenarkan dijalankan boleh dihadkan melalui parameter tamat masa yang belum tamat melebihi had masa ini akan dihadkan secara paksa ditamatkan;
Perbezaan antara tamat tempoh dan masa

tamat ditetapkan dalam fail konfigurasi dan merujuk kepada masa tamat tugasan Masa ini adalah global dan mempengaruhi semua proses kerja ditetapkan dalam parameter baris arahan dan merujuk kepada masa tamat masa sub-proses kerja Kali ini Ia hanya sah untuk perintah dengar yang dilaksanakan pada masa ini ialah sub-proses kerja. Senario penggunaan yang berbeza

kerja senario terpakai ialah:

01: Bilangan tugas yang besar
02: Keperluan prestasi tinggi

03: Masa pelaksanaan tugas yang singkat

04: Tiada gelung tak terhingga , sleep(), exit(), die() dan logik lain yang boleh membawa kepada pepijat dalam kelas pengguna dengan mudah

dengar senario yang berkenaan ialah:

01: Bilangan tugasan adalah kecil
02: Masa pelaksanaan tugas adalah panjang
03: Masa pelaksanaan tugas perlu dihadkan dengan ketat

Operasi awam

Memandangkan kami melakukan analisis berdasarkan redis, kita hanya perlu menganalisis src/queue/connector/redis.php
01: Mula-mula panggil kaedah ajaib dalam

02: Dipanggil dalam kaedah __callStatic

03 : Mula-mula muatkan fail konfigurasi dalam buildConnector, jika tiada, ia akan dilaksanakan secara serentak
04: Buat sambungan berdasarkan fail konfigurasi dan lulus dalam konfigurasi src/Queue.php__callStatic
Operasi dalam pembina kelas redis.php:buildConnector01: Semak sama ada sambungan redis dipasang
02: Gabungkan konfigurasi
03: Semak sama ada ia sambungan redis atau pRedis
04: Cipta objek sambungan

Proses penerbitan

Terbitkan parameter

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

Analisa think-queue (analisis sekitar redis)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

Atas ialah kandungan terperinci Analisa think-queue (analisis sekitar redis). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:segmentfault.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam