>  기사  >  PHP 프레임워크  >  싱크 큐 분석(redis 중심 분석)

싱크 큐 분석(redis 중심 분석)

藏色散人
藏色散人앞으로
2021-07-26 16:00:193979검색

머리말

분석하기 전에 메시지 큐 구현을 꼭 이해하세요

tp5의 메시지 큐는 데이터베이스 Redis를 기반으로 하며 tp의 Topthink 공식 구현을 기반으로 합니다.
이 장은 Redis for Redis를 기반으로 합니다. 분석

저장 키:

key Type Description
queues:queueNamequeues:queueName list 要执行的任务
think:queue:restart string 重启队列时间戳
queues:queueName:delayed zSet 延迟任务
queues:queueName:reserved zSet 执行失败,等待重新执行

执行命令

work和listen的区别在下面会解释
命令 描述
php think queue:work 监听队列
php think queue:listen 监听队列
php think queue:restart 重启队列
php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现

行为标签

标签 描述
worker_daemon_start 守护进程开启
worker_memory_exceeded 内存超出
worker_queue_restart 重启守护进程
worker_before_process 任务开始执行之前
worker_before_sleep 任务延迟执行
queue_failed 任务执行失败

命令参数

list수행할 작업 생각해 보세요: 대기열:재시작string
参数 默认值 可以使用的模式 描述
queue null work,listen 要执行的任务名称
daemon null work 以守护进程执行任务
delay 0 work,listen 失败后重新执行的时间
force null work 失败后重新执行的时间
memory 128M work,listen 限制最大内存
sleep 3 work,listen 没有任务的时候等待的时间
tries
🎜재시작 대기열 타임스탬프🎜🎜🎜🎜queues:queueName:delayed🎜🎜zSet🎜🎜지연된 작업🎜🎜🎜🎜queues:queueName : 예약됨🎜🎜zSet🎜🎜실행 실패, 재실행 대기 중🎜🎜🎜🎜🎜Execute command🎜
work와 Listen의 차이점은 아래에 설명되어 있습니다
🎜🎜🎜🎜Command🎜 🎜Description🎜🎜 🎜🎜🎜🎜php think queue:작업🎜🎜Listen to the queue🎜🎜🎜🎜php think queue:listen🎜🎜Listen to the queue🎜🎜 🎜🎜php think queue :restart🎜🎜Restart the queue🎜🎜🎜🎜php think queue:subscribe🎜🎜아니요, 공식적으로 다른 아이디어가 있을 수도 있습니다. 아직 구현되지 않았습니다🎜🎜🎜🎜🎜 태그 🎜🎜🎜🎜🎜 태그 🎜🎜 설명 🎜🎜🎜🎜🎜🎜worker_daemon_start🎜🎜데몬 시작 🎜🎜🎜 🎜worker_memory_exceeded🎜🎜 메모리 초과 🎜🎜🎜🎜worker_queue_restart🎜🎜데몬 다시 시작🎜🎜🎜🎜worker_before_process🎜🎜작업 실행 시작 전🎜🎜🎜🎜worker_before_ 자다🎜🎜지연된 작업 실행🎜🎜🎜 🎜<code>queue_failed🎜🎜작업 실행 실패🎜🎜🎜🎜🎜명령 매개변수🎜🎜🎜🎜🎜Parameters🎜🎜기본값🎜🎜 사용할 수 있는 모드🎜 🎜Description🎜🎜🎜🎜 🎜🎜queue 🎜🎜null🎜🎜work,listen🎜🎜실행할 작업 이름🎜🎜🎜🎜데몬🎜🎜null🎜 🎜work🎜🎜작업을 데몬 프로세스로 실행🎜🎜🎜🎜delay🎜🎜0🎜🎜work,listen🎜🎜실패 후 재실행 시간🎜🎜🎜🎜force code>🎜🎜null🎜🎜work🎜🎜실패 후 재실행 시간🎜🎜 🎜🎜메모리🎜🎜128M🎜🎜work,listen🎜🎜최대 메모리 제한🎜🎜🎜🎜 sleep🎜🎜3🎜🎜work,listen🎜🎜작업이 없을 때 대기 시간 🎜🎜🎜🎜tries🎜🎜0🎜🎜work,listen🎜🎜작업 후 최대 시도 횟수 실패🎜🎜🎜🎜

모드 차이점

1: 다양한 실행 원칙
작업: 단일 프로세스 처리 모드
데몬 매개변수 없음 작업 프로세스는 다음 메시지를 처리한 후 현재 프로세스를 직접 종료합니다. 새 메시지가 없으면 일정 시간 동안 휴면 상태가 된 후 종료됩니다.
데몬 매개변수를 사용하면 작업 프로세스는 프로세스를 종료하기 전에 메모리가 매개변수 구성을 초과할 때까지 대기열의 메시지를 주기적으로 처리합니다. 새 메시지가 없으면 각 루프에서 일정 시간 동안 대기합니다.

듣기: 상위 프로세스 + 하위 프로세스 처리 모드
상위 프로세스에 단일 실행 모드 작업 하위 프로세스가 생성됩니다. 작업 하위 프로세스가 종료되면
가 있는 상위 프로세스는 하위 프로세스의 종료 신호를 수신하고 새로운 단일 실행 작업 하위 프로세스를 다시 생성합니다. process. ;

2: 종료 시점이 다릅니다
work: 위를 참조하세요
들어보세요: 다음 두 가지 상황이 발생하지 않는 한 상위 프로세스는 항상 정상적인 상황에서 실행됩니다.
01: 생성된 작업 하위 프로세스의 실행 시간 이 시점에서 수신 명령 --timeout 매개변수 구성을 초과하면 작업 하위 프로세스가 강제로 종료되고 수신이 있는 상위 프로세스도 ProcessTimeoutException 예외를 발생시키고 종료됩니다. 예외를 포착하고 상위 프로세스가 계속 실행되도록 합니다.

02: 어떤 이유로 상위 프로세스에 메모리 누수가 발생했습니다. 하위 프로세스가 종료됩니다. 정상적인 상황에서 듣기 프로세스 자체가 차지하는 메모리는 안정적입니다.


3: 성능이 다릅니다

work: 스크립트 내부에서 반복되며, 명령 실행 시작 시 프레임워크 스크립트가 로드됩니다.


listen: 작업을 처리한 후 새 작업 프로세스를 열고 프레임워크가 로드됩니다. 이때 리로드됨 Script;

그래서 작업 모드의 성능이 듣기 모드의 성능보다 높을 것입니다.

참고: 코드가 업데이트되면 작업 모드에서 php think queue:restart 명령을 수동으로 실행하여 변경 사항이 적용되도록 대기열을 다시 시작해야 합니다. 청취 모드에서는 자동으로 적용되며 다른 작업은 수행되지 않습니다. 필수의.


4: 시간 초과 제어 기능

작업: 기본적으로 프로세스 자체의 실행 시간을 제어하거나 실행 작업의 실행 시간을 제한할 수 없습니다.

듣기: 생성되는 작업 하위 프로세스의 시간 초과 시간을 제한할 수 있습니다.

can 작업 하위 프로세스가 실행될 수 있는 최대 시간은 timeout 매개변수를 통해 제한됩니다. 이 시간 제한을 초과하지 않은 하위 프로세스는 강제로 종료됩니다.

expire와 time


expire의 차이는 다음과 같습니다. 작업 만료 시간을 나타내는 구성 파일에 설정됩니다. 시간은 전역적이며 모든 작업 프로세스에 영향을 미칩니다. 시간 초과는 명령줄 매개변수에 설정되며 작업 하위 프로세스의 시간 초과 시간을 나타냅니다. 현재 실행되는 수신 명령에 유효합니다. 시간 초과의 대상은 작업 하위 프로세스입니다.

5: 다양한 사용 시나리오

작업 적용 시나리오는 다음과 같습니다.

01: 많은 수의 작업

02: 고성능 요구 사항

03: 짧습니다. 작업 실행 시간
04: 소비자 클래스에는 무한 루프가 없습니다. sleep() ,exit(), die() 및 버그로 쉽게 이어지는 기타 논리

적용 가능한 시나리오는 다음과 같습니다.

01: 작업 수 jobs is small

02: task의 실행 시간이 길다

03: task의 실행 시간을 엄격하게 제한해야 함


Public Operations

redis 기반으로 분석을 하기 때문에 src/만 분석하면 됨 queue/connector/redis.php

01: 먼저 src/Queue.php에서 매직 메소드 를 호출합니다. __callStatic

02: buildConnector가 호출됩니다. __callStatic 메소드
03: 구성 파일이 먼저 buildConnector에 로드됩니다. 아무것도 없으면 동기적으로 실행됩니다.
04: 구성 파일을 기반으로 연결을 생성하고 생성자의 구성 src/Queue.php中的魔术方法 __callStatic
02: 在__callStatic方法中调用了 buildConnector

Operations에 전달합니다. redis.php 클래스:
01: Redis 확장이 설치되어 있는지 감지
02: 구성 병합

03: Redis 확장인지 pRedis인지 감지
04: 연결 개체 만들기

게시 프로세스

매개변수 게시

매개변수 이름기본값설명사용할 수 있는 방법$jobNone작업을 수행하는 클래스push, 나중에$ 데이터비어 있음 작업 데이터push,나중에$queuedefault작업 이름push,later$delaynull나중에나중에

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

싱크 큐 분석(redis 중심 분석)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

更多thinkphp技术知识,请访问thinkphp教程栏目!

위 내용은 싱크 큐 분석(redis 중심 분석)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제