0 work,listen |
Maximum number of attempts after task failure |
|
Mode difference
1: Different execution principles work: Single process processing mode; No daemon parameter The work process will directly end the current process after processing the next message. When there are no new messages, it will sleep for a period of time and then exit; With daemon parameters, the work process will process the messages in the queue in a loop until the memory exceeds the parameter configuration before ending the process. When there is no new message, it will sleep for a period of time in each loop;
listen: the processing mode of the parent process and child process; will create a single execution mode in the parent process. The work child process will process the next message in the queue through the work child process. When the work child process exits, the parent process where is located will listen to the exit signal of the child process and create a new one. Work child process executed in a single time;
2: The exit timing is different work: See above listen: The parent process will always run under normal circumstances, unless the following two situations are encountered 01: The execution time of a created work child process exceeds the --timeout parameter configuration in the listen command line; at this time, the work child process will be forcibly terminated, and the parent process where listen is located will also throw a ProcessTimeoutException exception and exit. ;
Developers can choose to catch this exception and let the parent process continue to execute; 02: If the parent process has a memory leak for some reason, when the memory occupied by the parent process itself exceeds the command line When the --memory parameter is configured in, both the parent and child processes will exit. Under normal circumstances, the memory occupied by the listen process itself is stable.
3: Different performance work: It loops inside the script, and the framework script is loaded in the early stage of command execution;
listen: It starts a new process after processing a task A work process will reload the framework script at this time;
Therefore, the performance of work mode will be higher than that of listen mode. Note: When the code is updated, you need to manually execute the php think queue:restart command in work mode to restart the queue for the changes to take effect; while in listen mode, it will take effect automatically without any other operations.
4: Timeout control capability work: In essence, it can neither control the running time of the process itself nor limit the execution time of the executing tasks; listen: can limit the work children it creates The timeout time of the process;
The timeout parameter can be used to limit the maximum time that the work sub-process is allowed to run. Sub-processes that have not ended beyond this time limit will be forcibly terminated; The difference between expire and time
expire is set in the configuration file and refers to the expiration time of the task. This time is global and affects all work processes. timeout is set in the command line parameters and refers to the timeout time of the work sub-process. This time It is only valid for the currently executed listen command. The target of timeout is the work sub-process;
5: Different usage scenarios
work applicable scenarios are: 01: A large number of tasks 02: High performance requirements 03: Short task execution time 04: There is no infinite loop, sleep(), exit(), die() and other logic that can easily lead to bugs in the consumer class
listen Applicable scenarios are:
01: The number of tasks is small 02: The execution time of the task is long 03: The execution time of the task needs to be strictly limited Public operations
Since we are doing analysis based on redis, we only need to analyze src/queue/connector/redis.php 01: First call src/Queue.php The magic method in __callStatic 02: buildConnector is called in the __callStatic method 03: The configuration file is loaded first in buildConnector. If none, it will be executed synchronously 04: Create a connection based on the configuration file and pass in the configuration
Operations in the constructor of the redis.php class: 01: Check whether the redis extension is installed 02: Merge configuration 03: Detect whether it is a redis extension or pRedis 04: Create a connection object
Publishing process
Publishing parameters
Parameter name |
Default value |
Description |
Methods that can be used |
$job | None |
The class to perform the task |
push,later |
##$data | empty | Task data | push,later |
$queue | default | Task name | push,later |
$delay | null | Delay time | later |
##立即执行
push($job, $data, $queue)
Queue::push(Test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName 数组结构:
[
'job' => $job, // 要执行任务的类
'data' => $data, // 任务数据
'id'=>'xxxxx' //任务id
]
写入 redis并且返回队列id 至于中间的那顿骚操作太长了就没写
延迟发布
later($delay, $job, $data, $queue)
Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多 一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay
执行过程
执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解; 最后讲一下标签的使用
//守护进程开启
'worker_daemon_start' => [
\app\index\behavior\WorkerDaemonStart::class
],
//内存超出
'worker_memory_exceeded' => [
\app\index\behavior\WorkerMemoryExceeded::class
],
//重启守护进程
'worker_queue_restart' => [
\app\index\behavior\WorkerQueueRestart::class
],
//任务开始执行之前
'worker_before_process' => [
\app\index\behavior\WorkerBeforeProcess::class
],
//任务延迟执行
'worker_before_sleep' => [
\app\index\behavior\WorkerBeforeSleep::class
],
//任务执行失败
'queue_failed' => [
\app\index\behavior\QueueFailed::class
]
public function run(Output $output)
{
$output->write('<info>任务执行失败</info>', true);
}
控制台执行 php think queue:work --queue test --daemon 会在控制台一次输出
守护进程开启
任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值 会触发 queue_failed
在app\index\behavior@run 方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架
在其他项目中推任务
php版本
<?php class Index
{
private $redis = null;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->redis->select(10);
}
public function push($job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->rPush('queues:' . $queue, $payload);
}
public function later($delay, $job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
}
private function createPayload($job, $data)
{
$payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
return $this->setMeta($payload, 'attempts', 1);
}
private function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}
return $payload;
}
private function random(int $length = 16): string
{
$str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
$randomString = '';
for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
go版本
package main
import (
"encoding/json"
"github.com/garyburd/redigo/redis"
"math/rand"
"time"
)
type Payload struct {
Id string `json:"id"`
Job string `json:"job"`
Data interface{} `json:"data"`
Attempts int `json:"attempts"`
}
var RedisClient *redis.Pool
func init() {
RedisClient = &redis.Pool{
MaxIdle: 20,
MaxActive: 500,
IdleTimeout: time.Second * 100,
Dial: func() (conn redis.Conn, e error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
_, _ = c.Do("SELECT", 10)
return c, nil
},
}
}
func main() {
var data = make(map[string]interface{})
data["id"] = "1"
later(10, "app\\index\\jobs\\Test", data, "test")
}
func push(job string, data interface{}, queue string) {
payload := createPayload(job, data)
queueName := "queues:" + queue
_, _ = RedisClient.Get().Do("rPush", queueName, payload)
}
func later(delay int, job string, data interface{}, queue string) {
m, _ := time.ParseDuration("+1s")
currentTime := time.Now()
op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
createPayload(job, data)
payload := createPayload(job, data)
queueName := "queues:" + queue + ":delayed"
_, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}
// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}
jsonStr, _ := json.Marshal(payload1)
return string(jsonStr)
}
// 创建随机字符串
func random(n int) string {
var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = str[rand.Intn(len(str))]
}
return string(b)
}
更多thinkphp技术知识,请访问thinkphp教程栏目!
|