Maison  >  Article  >  cadre php  >  Analyser la file de réflexion (analyse autour de Redis)

Analyser la file de réflexion (analyse autour de Redis)

藏色散人
藏色散人avant
2021-07-26 16:00:194046parcourir

Avant-propos

Veuillez vous assurer de comprendre l'implémentation de la file d'attente des messages avant l'analyse

La file d'attente des messages de tp5 est basée sur la base de données Redis et l'implémentation officielle de Topthink par tp
Ce chapitre est basé sur Redis pour analyse

Clé de stockage :

key 类型 描述
queues:queueName list 要执行的任务
think:queue:restart string 重启队列时间戳
queues:queueName:delayed zSet 延迟任务
queues:queueName:reserved zSet 执行失败,等待重新执行

Exécuter la commande

La différence entre travailler et écouter sera expliquée ci-dessous
命令 描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现

Étiquette de comportement

标签 描述
worker_daemon_start 守护进程开启
worker_memory_exceeded 内存超出
worker_queue_restart 重启守护进程
worker_before_process 任务开始执行之前
worker_before_sleep 任务延迟执行
queue_failed 任务执行失败

Paramètres de la commande

参数 默认值 可以使用的模式 描述
queue null work,listen 要执行的任务名称
daemon null work 以守护进程执行任务
delay 0 work,listen 失败后重新执行的时间
force null work 失败后重新执行的时间
memory 128M work,listen 限制最大内存
sleep 3 work,listen 没有任务的时候等待的时间
tries 0 work,listen 任务失败后最大尝试次数

Différences de mode

1 : Différents principes d'exécution
travail : mode de traitement à processus unique ;
Aucun paramètre de démon Le processus de travail mettra directement fin au processus en cours après le traitement du message suivant. Lorsqu'il n'y a pas de nouveaux messages, il se met en veille pendant un certain temps puis se ferme ;
Avec le paramètre démon, le processus de travail traitera les messages dans la file d'attente de manière cyclique jusqu'à ce que la mémoire dépasse la configuration du paramètre avant de terminer le processus. Lorsqu'il n'y a pas de nouveau message, il dormira pendant un certain temps dans chaque boucle ;

écoute : processus parent + mode de traitement du processus enfant
créera un processus enfant en mode d'exécution unique dans le processus parent, et le message suivant ; dans la file d'attente est traité via le sous-processus de travail. Lorsque le sous-processus de travail se termine ; le processus parent où se trouve
écoutera le signal de sortie du sous-processus et recréera un nouveau sous-processus de travail à exécution unique. processus. ;

2 : Le moment de la sortie est différent
work : Voir ci-dessus
écouter : Le processus parent s'exécutera toujours dans des circonstances normales, sauf si les deux situations suivantes sont rencontrées
01 : Le temps d'exécution d'un processus enfant de travail créé. dépasse la configuration du paramètre --timeout de la commande d'écoute ; à ce moment, le processus enfant sera forcé de se terminer et le processus parent où se trouve l'écoute lancera également une exception ProcessTimeoutException et quittera

Les développeurs peuvent choisir ; pour intercepter l'exception et laisser le processus parent continuer à s'exécuter ;
02 : Le processus parent a une fuite de mémoire pour une raison quelconque. Lorsque la mémoire occupée par le processus parent dépasse la configuration du paramètre --memory dans la ligne de commande, le processus parent. et les processus enfants se termineront. Dans des circonstances normales, la mémoire occupée par le processus d'écoute lui-même est stable.

3 : Les performances sont différentes
work : Il boucle à l'intérieur du script, et le script du framework est chargé au début de l'exécution de la commande

écouter : Il ouvre un nouveau processus de travail après le traitement d'une tâche, et le framework sera rechargé à ce moment-là Script ;

Les performances du mode travail seront donc supérieures à celles du mode écoute.
Remarque : lorsque le code est mis à jour, vous devez exécuter manuellement la commande php think queue:restart en mode travail pour redémarrer la file d'attente pour que les modifications prennent effet. En mode écoute, elles prendront effet automatiquement, aucune autre opération n'est effectuée ; requis.

4 : Capacité de contrôle du délai d'attente
travail : Essentiellement, il ne peut ni contrôler le temps d'exécution du processus lui-même ni limiter le temps d'exécution des tâches en cours d'exécution.
écouter : peut limiter le délai d'attente du sous-processus de travail qu'il crée ;

can Le paramètre timeout est utilisé pour limiter la durée maximale pendant laquelle le sous-processus de travail est autorisé à s'exécuter. Les sous-processus qui ne se sont pas terminés au-delà de ce délai seront terminés de force. La différence entre
expire et time

expire. est défini dans le fichier de configuration, qui fait référence au délai d'expiration de la tâche. Le temps est global et affecte tous les processus de travail. Le délai d'attente est défini dans les paramètres de ligne de commande et fait référence au délai d'expiration du sous-processus de travail. valable uniquement pour la commande d'écoute actuellement exécutée. L'objet du délai d'attente est le sous-processus de travail

5 : Différents scénarios d'utilisation

les scénarios applicables au travail sont :

01 : Grand nombre de tâches
02 : Exigences de hautes performances
03 : Temps d'exécution court des tâches
04 : Il n'y a pas de boucle infinie dans la classe consommateur, sleep() , exit(), die() et d'autres logiques qui peuvent facilement conduire à des bugs

écoutez les scénarios applicables sont :

01 : Le le nombre de tâches est petit

02 : Le temps d'exécution de la tâche est long
03 : Le temps d'exécution de la tâche doit être strictement limité

Opérations publiques

Puisque nous effectuons des analyses basées sur Redis, nous n'avons besoin que d'analyser src/queue/connector/redis.php
01 : Appelez d'abord la méthode magique dans
src/Queue.php__callStatic02 : Appelez
buildConnector03 dans la méthode __callStatic : Dans buildConnector, le fichier de configuration est d'abord chargé s'il n'y en a pas, il sera exécuté de manière synchrone
04 : Créer une connexion selon le fichier de configuration et passer la configuration

Opérations dans le constructeur de la classe redis.php :

01 : Détecter si l'extension redis est installée
02 : Fusionner la configuration
03 : Détecter qu'il s'agisse d'une extension Redis ou de pRedis
04 : Créer un objet de connexion

Processus de publication

Paramètres de publication

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

Analyser la file de réflexion (analyse autour de Redis)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer