Avant-propos
Veuillez vous assurer de comprendre l'implémentation de la file d'attente des messages avant l'analyse
La file d'attente des messages de tp5 est basée sur la base de données Redis et l'implémentation officielle de Topthink par tp
Ce chapitre est basé sur Redis pour analyse
Clé de stockage :
key | 类型 | 描述 |
---|---|---|
queues:queueName |
list | 要执行的任务 |
think:queue:restart |
string | 重启队列时间戳 |
queues:queueName:delayed |
zSet | 延迟任务 |
queues:queueName:reserved |
zSet | 执行失败,等待重新执行 |
Exécuter la commande
La différence entre travailler et écouter sera expliquée ci-dessous
命令 | 描述 |
---|---|
php think queue:work |
监听队列 |
php think queue:listen |
监听队列 |
php think queue:restart |
重启队列 |
php think queue:subscribe |
暂无,可能是保留的 官方有什么其他想法但是还没实现 |
Étiquette de comportement
标签 | 描述 |
---|---|
worker_daemon_start |
守护进程开启 |
worker_memory_exceeded |
内存超出 |
worker_queue_restart |
重启守护进程 |
worker_before_process |
任务开始执行之前 |
worker_before_sleep |
任务延迟执行 |
queue_failed |
任务执行失败 |
Paramètres de la commande
参数 | 默认值 | 可以使用的模式 | 描述 |
---|---|---|---|
queue |
null | work,listen | 要执行的任务名称 |
daemon |
null | work | 以守护进程执行任务 |
delay |
0 | work,listen | 失败后重新执行的时间 |
force |
null | work | 失败后重新执行的时间 |
memory |
128M | work,listen | 限制最大内存 |
sleep |
3 | work,listen | 没有任务的时候等待的时间 |
tries |
0 | work,listen | 任务失败后最大尝试次数 |
Différences de mode
1 : Différents principes d'exécution
travail : mode de traitement à processus unique ;
Aucun paramètre de démon Le processus de travail mettra directement fin au processus en cours après le traitement du message suivant. Lorsqu'il n'y a pas de nouveaux messages, il se met en veille pendant un certain temps puis se ferme ;
Avec le paramètre démon, le processus de travail traitera les messages dans la file d'attente de manière cyclique jusqu'à ce que la mémoire dépasse la configuration du paramètre avant de terminer le processus. Lorsqu'il n'y a pas de nouveau message, il dormira pendant un certain temps dans chaque boucle ;
écoute : processus parent + mode de traitement du processus enfant
créera un processus enfant en mode d'exécution unique dans le processus parent, et le message suivant ; dans la file d'attente est traité via le sous-processus de travail. Lorsque le sous-processus de travail se termine ; le processus parent où se trouve
écoutera le signal de sortie du sous-processus et recréera un nouveau sous-processus de travail à exécution unique. processus. ;
2 : Le moment de la sortie est différent
work : Voir ci-dessus
écouter : Le processus parent s'exécutera toujours dans des circonstances normales, sauf si les deux situations suivantes sont rencontrées
01 : Le temps d'exécution d'un processus enfant de travail créé. dépasse la configuration du paramètre --timeout de la commande d'écoute ; à ce moment, le processus enfant sera forcé de se terminer et le processus parent où se trouve l'écoute lancera également une exception ProcessTimeoutException et quittera
Les développeurs peuvent choisir ; pour intercepter l'exception et laisser le processus parent continuer à s'exécuter ;
02 : Le processus parent a une fuite de mémoire pour une raison quelconque. Lorsque la mémoire occupée par le processus parent dépasse la configuration du paramètre --memory dans la ligne de commande, le processus parent. et les processus enfants se termineront. Dans des circonstances normales, la mémoire occupée par le processus d'écoute lui-même est stable.
3 : Les performances sont différentes
work : Il boucle à l'intérieur du script, et le script du framework est chargé au début de l'exécution de la commande
écouter : Il ouvre un nouveau processus de travail après le traitement d'une tâche, et le framework sera rechargé à ce moment-là Script ;
Les performances du mode travail seront donc supérieures à celles du mode écoute.
Remarque : lorsque le code est mis à jour, vous devez exécuter manuellement la commande php think queue:restart en mode travail pour redémarrer la file d'attente pour que les modifications prennent effet. En mode écoute, elles prendront effet automatiquement, aucune autre opération n'est effectuée ; requis.
4 : Capacité de contrôle du délai d'attente
travail : Essentiellement, il ne peut ni contrôler le temps d'exécution du processus lui-même ni limiter le temps d'exécution des tâches en cours d'exécution.
écouter : peut limiter le délai d'attente du sous-processus de travail qu'il crée ;
can Le paramètre timeout est utilisé pour limiter la durée maximale pendant laquelle le sous-processus de travail est autorisé à s'exécuter. Les sous-processus qui ne se sont pas terminés au-delà de ce délai seront terminés de force. La différence entre
expire et time
expire. est défini dans le fichier de configuration, qui fait référence au délai d'expiration de la tâche. Le temps est global et affecte tous les processus de travail. Le délai d'attente est défini dans les paramètres de ligne de commande et fait référence au délai d'expiration du sous-processus de travail. valable uniquement pour la commande d'écoute actuellement exécutée. L'objet du délai d'attente est le sous-processus de travail
01 : Grand nombre de tâches
02 : Exigences de hautes performances
03 : Temps d'exécution court des tâches
04 : Il n'y a pas de boucle infinie dans la classe consommateur, sleep() , exit(), die() et d'autres logiques qui peuvent facilement conduire à des bugs
02 : Le temps d'exécution de la tâche est long
03 : Le temps d'exécution de la tâche doit être strictement limité
01 : Appelez d'abord la méthode magique dansOpérations dans le constructeur de la classe redis.php :src/Queue.php
__callStatic
02 : AppelezbuildConnector
03 dans la méthode __callStatic : Dans buildConnector, le fichier de configuration est d'abord chargé s'il n'y en a pas, il sera exécuté de manière synchrone
04 : Créer une connexion selon le fichier de configuration et passer la configuration
01 : Détecter si l'extension redis est installée
02 : Fusionner la configuration
03 : Détecter qu'il s'agisse d'une extension Redis ou de pRedis
04 : Créer un objet de connexion
立即执行
push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:
[ 'job' => $job, // 要执行任务的类 'data' => $data, // 任务数据 'id'=>'xxxxx' //任务id ]
写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写
延迟发布
later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed
score为当前的时间戳+$delay
执行过程
执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用
//守护进程开启 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //内存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重启守护进程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任务开始执行之前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任务延迟执行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任务执行失败 'queue_failed' => [ \app\index\behavior\QueueFailed::class ]
public function run(Output $output) { $output->write('<info>任务执行失败</info>', true); }
控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出
守护进程开启 任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed
在app\index\behavior@run
方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架
在其他项目中推任务
php版本
<?php class Index { private $redis = null; public function __construct() { $this->redis = new Redis(); $this->redis->connect('127.0.0.1', 6379); $this->redis->select(10); } public function push($job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->rPush('queues:' . $queue, $payload); } public function later($delay, $job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload); } private function createPayload($job, $data) { $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32)); return $this->setMeta($payload, 'attempts', 1); } private function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } private function random(int $length = 16): string { $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; $randomString = ''; for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
go版本
package main import ( "encoding/json" "github.com/garyburd/redigo/redis" "math/rand" "time" ) type Payload struct { Id string `json:"id"` Job string `json:"job"` Data interface{} `json:"data"` Attempts int `json:"attempts"` } var RedisClient *redis.Pool func init() { RedisClient = &redis.Pool{ MaxIdle: 20, MaxActive: 500, IdleTimeout: time.Second * 100, Dial: func() (conn redis.Conn, e error) { c, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } _, _ = c.Do("SELECT", 10) return c, nil }, } } func main() { var data = make(map[string]interface{}) data["id"] = "1" later(10, "app\\index\\jobs\\Test", data, "test") } func push(job string, data interface{}, queue string) { payload := createPayload(job, data) queueName := "queues:" + queue _, _ = RedisClient.Get().Do("rPush", queueName, payload) } func later(delay int, job string, data interface{}, queue string) { m, _ := time.ParseDuration("+1s") currentTime := time.Now() op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix() createPayload(job, data) payload := createPayload(job, data) queueName := "queues:" + queue + ":delayed" _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) } // 创建指定格式的数据 func createPayload(job string, data interface{}) (payload string) { payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1} jsonStr, _ := json.Marshal(payload1) return string(jsonStr) } // 创建随机字符串 func random(n int) string { var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, n) for i := range b { b[i] = str[rand.Intn(len(str))] } return string(b) }