null |
遅延時間 |
後で |
| ##立即执行
push($job, $data, $queue)
Queue::push(Test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:
[
'job' => $job, // 要执行任务的类
'data' => $data, // 任务数据
'id'=>'xxxxx' //任务id
]
写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写
延迟发布
later($delay, $job, $data, $queue)
Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed
score为当前的时间戳+$delay
执行过程
执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用
//守护进程开启
'worker_daemon_start' => [
\app\index\behavior\WorkerDaemonStart::class
],
//内存超出
'worker_memory_exceeded' => [
\app\index\behavior\WorkerMemoryExceeded::class
],
//重启守护进程
'worker_queue_restart' => [
\app\index\behavior\WorkerQueueRestart::class
],
//任务开始执行之前
'worker_before_process' => [
\app\index\behavior\WorkerBeforeProcess::class
],
//任务延迟执行
'worker_before_sleep' => [
\app\index\behavior\WorkerBeforeSleep::class
],
//任务执行失败
'queue_failed' => [
\app\index\behavior\QueueFailed::class
]
public function run(Output $output)
{
$output->write('<info>任务执行失败</info>', true);
}
控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出
守护进程开启
任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed
在app\index\behavior@run
方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架
在其他项目中推任务
php版本
<?php class Index
{
private $redis = null;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->redis->select(10);
}
public function push($job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->rPush('queues:' . $queue, $payload);
}
public function later($delay, $job, $data, $queue)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
}
private function createPayload($job, $data)
{
$payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
return $this->setMeta($payload, 'attempts', 1);
}
private function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
}
return $payload;
}
private function random(int $length = 16): string
{
$str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
$randomString = '';
for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
go版本
package main
import (
"encoding/json"
"github.com/garyburd/redigo/redis"
"math/rand"
"time"
)
type Payload struct {
Id string `json:"id"`
Job string `json:"job"`
Data interface{} `json:"data"`
Attempts int `json:"attempts"`
}
var RedisClient *redis.Pool
func init() {
RedisClient = &redis.Pool{
MaxIdle: 20,
MaxActive: 500,
IdleTimeout: time.Second * 100,
Dial: func() (conn redis.Conn, e error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
_, _ = c.Do("SELECT", 10)
return c, nil
},
}
}
func main() {
var data = make(map[string]interface{})
data["id"] = "1"
later(10, "app\\index\\jobs\\Test", data, "test")
}
func push(job string, data interface{}, queue string) {
payload := createPayload(job, data)
queueName := "queues:" + queue
_, _ = RedisClient.Get().Do("rPush", queueName, payload)
}
func later(delay int, job string, data interface{}, queue string) {
m, _ := time.ParseDuration("+1s")
currentTime := time.Now()
op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
createPayload(job, data)
payload := createPayload(job, data)
queueName := "queues:" + queue + ":delayed"
_, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}
// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}
jsonStr, _ := json.Marshal(payload1)
return string(jsonStr)
}
// 创建随机字符串
func random(n int) string {
var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = str[rand.Intn(len(str))]
}
return string(b)
}
更多thinkphp技术知识,请访问thinkphp教程栏目!