Heim >PHP-Framework >Denken Sie an PHP >Think-Queue analysieren (Analyse rund um Redis)

Think-Queue analysieren (Analyse rund um Redis)

藏色散人
藏色散人nach vorne
2021-07-26 16:00:194231Durchsuche

Vorwort

Bitte stellen Sie vor der Analyse sicher, dass Sie die Implementierung der Nachrichtenwarteschlange verstehen.

Die Nachrichtenwarteschlange von tp5 basiert auf der Datenbank Redis und der offiziellen Topthink-Implementierung von tp.
Dieses Kapitel basiert auf Redis für Analyse

Speicherschlüssel:

Schlüssel Typ Beschreibung
queues:queueNamequeues:queueName list 要执行的任务
think:queue:restart string 重启队列时间戳
queues:queueName:delayed zSet 延迟任务
queues:queueName:reserved zSet 执行失败,等待重新执行

执行命令

work和listen的区别在下面会解释
命令 描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现

行为标签

标签 描述
worker_daemon_start 守护进程开启
worker_memory_exceeded 内存超出
worker_queue_restart 重启守护进程
worker_before_process 任务开始执行之前
worker_before_sleep 任务延迟执行
queue_failed 任务执行失败

命令参数

listAuszuführende Aufgaben Denken Sie: queue:restartstring
参数 默认值 可以使用的模式 描述
queue null work,listen 要执行的任务名称
daemon null work 以守护进程执行任务
delay 0 work,listen 失败后重新执行的时间
force null work 失败后重新执行的时间
memory 128M work,listen 限制最大内存
sleep 3 work,listen 没有任务的时候等待的时间
tries
🎜Zeitstempel der Warteschlange neu starten🎜🎜🎜🎜queues:queueName:delayed🎜🎜zSet🎜🎜Verzögerte Aufgaben🎜🎜🎜🎜queues:queueName : reserviert🎜🎜zSet🎜🎜Ausführung fehlgeschlagen, wartet auf erneute Ausführung🎜🎜🎜🎜🎜Befehl ausführen🎜
Der Unterschied zwischen Arbeit und Zuhören wird unten erklärt
🎜🎜🎜🎜Befehl🎜 🎜Beschreibung🎜🎜 🎜🎜🎜🎜php think queue:work🎜🎜Listen queue🎜🎜🎜🎜php think queue:listen🎜🎜Listen queue🎜🎜🎜🎜php think queue :restart🎜🎜Starten Sie die Warteschlange neu🎜🎜🎜🎜php think queue:subscribe🎜🎜Nein, es kann sein, dass der Beamte andere Ideen hat, aber das war nicht der Fall noch implementiert🎜🎜🎜🎜🎜 Tag 🎜🎜🎜🎜🎜 Tag 🎜🎜 Beschreibung 🎜🎜🎜🎜🎜🎜worker_daemon_start🎜🎜Daemon startet. 🎜🎜🎜🎜worker_memory_exceeded🎜🎜 Speicher überschritten 🎜🎜🎜🎜worker_queue_restart🎜🎜Daemon neu starten🎜🎜🎜🎜worker_before_process🎜🎜Bevor die Aufgabe ausgeführt wird🎜🎜🎜🎜worker_before_sleep Code>🎜 🎜Verzögerte Aufgabenausführung🎜🎜🎜 🎜<code>queue_failed🎜🎜Aufgabenausführung fehlgeschlagen🎜🎜🎜🎜🎜Befehlsparameter🎜🎜🎜🎜🎜Parameter🎜🎜Standardwert🎜🎜Modi, die verwendet werden können 🎜🎜Beschreibung🎜🎜 🎜🎜 🎜🎜queue 🎜🎜null🎜🎜work,listen🎜🎜Der Name der auszuführenden Aufgabe🎜🎜🎜🎜daemon🎜🎜null🎜🎜work🎜🎜 Führen Sie die Aufgabe als Daemon-Prozess aus null >🎜🎜3🎜🎜arbeiten, zuhören🎜🎜Warten, wenn keine Aufgaben vorhanden sind. Zeit🎜🎜🎜🎜versuche🎜🎜0🎜🎜arbeiten, zuhören🎜🎜Maximale Anzahl von Versuchen nach fehlgeschlagener Aufgabe🎜🎜🎜 🎜

Modusunterschiede

1: Unterschiedliche Ausführungsprinzipien
Arbeit: Einzelprozess-Verarbeitungsmodus;
Kein Daemon-Parameter Der Arbeitsprozess beendet den aktuellen Prozess direkt nach der Verarbeitung der nächsten Nachricht. Wenn keine neuen Nachrichten vorhanden sind, wird es für einen bestimmten Zeitraum in den Ruhezustand versetzt und dann beendet.
Mit dem Daemon-Parameter verarbeitet der Arbeitsprozess die Nachrichten in der Warteschlange in einer Schleife, bis der Speicher die Parameterkonfiguration überschreitet, bevor der Prozess beendet wird. Wenn keine neue Nachricht vorhanden ist, wird in jeder Schleife eine Zeit lang geschlafen.

Listen: Verarbeitungsmodus des übergeordneten Prozesses + untergeordneter Prozess in der Warteschlange wird durch den Arbeitsunterprozess verarbeitet. Wenn der Arbeitsunterprozess beendet wird, hört der übergeordnete Prozess, in dem sich
befindet, auf das Ausgangssignal des Unterprozesses und erstellt einen neuen Arbeitsunterprozess mit Einzelausführung. Prozess. ;

2: Der Zeitpunkt des Beendens ist unterschiedlich. Arbeit: Siehe oben Überschreitet die Konfiguration des Listenbefehls --timeout in der Zeile, wird der Arbeitsunterprozess gezwungen, zu beenden, und der übergeordnete Prozess, in dem sich der Listenvorgang befindet, löst ebenfalls eine ProcessTimeoutException-Ausnahme aus und wird beendet.


Entwickler können wählen um die Ausnahme abzufangen und die Ausführung des übergeordneten Prozesses fortzusetzen;
02: Der übergeordnete Prozess weist aus irgendeinem Grund einen Speicherverlust auf, wenn der vom übergeordneten Prozess belegte Speicher die Parameterkonfiguration --memory überschreitet und untergeordnete Prozesse werden beendet. Unter normalen Umständen ist der vom Abhörprozess selbst belegte Speicher stabil.

3: Die Leistung ist anders.

Arbeit: Es wird eine Schleife innerhalb des Skripts ausgeführt und das Framework-Skript wird in der frühen Phase der Befehlsausführung geladen.

Listen: Nach der Verarbeitung einer Aufgabe wird ein neuer Arbeitsprozess gestartet, und das Framework wird geladen Zu diesem Zeitpunkt wurde das Skript neu geladen;


Die Leistung des Arbeitsmodus ist also höher als die des Hörmodus.

Hinweis: Wenn der Code aktualisiert wird, müssen Sie den Befehl php think queue:restart manuell im Arbeitsmodus ausführen, um die Warteschlange neu zu starten, damit die Änderungen wirksam werden. Im Abhörmodus werden sie automatisch wirksam, andere Vorgänge sind nicht möglich erforderlich.

4: Zeitüberschreitungskontrollfunktion

Arbeit: Im Wesentlichen kann es weder die Laufzeit des Prozesses selbst steuern noch die Ausführungszeit der ausgeführten Aufgaben begrenzen.
listen: Kann die Zeitüberschreitungszeit des von ihm erstellten Arbeitsunterprozesses begrenzen;

can Die maximale Zeit, die der Arbeitsunterprozess ausführen darf, ist durch den Timeout-Parameter begrenzt. Unterprozesse, die nicht nach diesem Zeitlimit beendet wurden, werden zwangsweise beendet. Der Unterschied zwischen
expire und time

expire beträgt in der Konfigurationsdatei festgelegt, die sich auf die Ablaufzeit der Aufgabe bezieht. Die Zeit ist global und betrifft alle Arbeitsprozesse. Die Zeitüberschreitung wird in den Befehlszeilenparametern festgelegt und bezieht sich auf die Zeitüberschreitungszeit des Arbeitsunterprozesses gültig für den aktuell ausgeführten Listen-Befehl.


5: Verschiedene Nutzungsszenarien

Anwendbare Arbeitsszenarien sind:
01: Große Anzahl von Aufgaben

02: Hohe Leistungsanforderungen

03: Kurz Ausführungszeit von Aufgaben

04: Es gibt keine Endlosschleife in der Verbraucherklasse, Sleep(), Exit(), Die() und anderen Logiken, die leicht zu Fehlern führen können.


Listen anwendbare Szenarien sind:

01: Die Zahl Anzahl der Aufgaben ist klein
02: Die Ausführungszeit der Aufgabe ist lang

03: Die Ausführungszeit der Aufgabe muss streng begrenzt sein

Öffentlicher Betrieb


Da wir Analysen auf Basis von Redis durchführen, müssen wir nur src analysieren /queue/connector/redis.php
01: Rufen Sie zuerst die magische Methode in <code>src/Queue.php auf. __callStatic

02: buildConnector wird aufgerufen in der __callStatic-Methode

03: Die Konfigurationsdatei wird zuerst in buildConnector geladen. Wenn keine vorhanden ist, wird sie synchron ausgeführt

04: Erstellen Sie eine Verbindung basierend auf der Konfigurationsdatei und übergeben Sie sie in Konfiguration

src/Queue.php中的魔术方法 __callStatic
02: 在__callStatic方法中调用了 buildConnectorOperationen im Konstruktor von die Klasse redis.php:
01: Erkennen, ob die Redis-Erweiterung installiert ist
02: Konfiguration zusammenführen
03: Erkennen, ob es sich um eine Redis-Erweiterung oder pRedis handelt
04: Verbindungsobjekt erstellen


Veröffentlichungsprozess

Parameter veröffentlichen

Parametername

StandardwertBeschreibungVerwendbare Methoden$jobKeineKlasse zur Ausführung der AufgabePush, später$ Daten leerAufgabendatenpush,später$WarteschlangeStandardAufgabennamepush,später$VerzögerungnullSpätere Zeit später

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

Think-Queue analysieren (Analyse rund um Redis)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

Das obige ist der detaillierte Inhalt vonThink-Queue analysieren (Analyse rund um Redis). Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen