>백엔드 개발 >PHP 튜토리얼 >PHP+redis는 지연 대기열을 구현합니다.

PHP+redis는 지연 대기열을 구현합니다.

不言
不言원래의
2018-05-07 10:41:1511438검색

이 기사에서는 특정 참조 값을 갖는 php+redis의 지연 대기열 구현을 주로 소개합니다. 이제는 모든 사람과 공유합니다. 필요한 친구가 참조할 수 있습니다.

제공과 같은 redis 주문 세트를 기반으로 지연 작업 실행을 구현합니다.
tp5 프레임워크에 작성했는데, 구현이 매우 간단하여 현재 회사 프로젝트에서 사용하기에 충분합니다. 백그라운드 프로세스는 많이 구현되지 않았습니다. 프로세스,
말은 많이 하지 않겠습니다. 코드만 게시하겠습니다. 조판으로 돌아가지 않겠습니다. 양해해 주시기 바랍니다.

1. 명령줄 스크립트 실행 방법: php 지연 대기열 대기열 이름을 생각하세요(이것이 순서 집합의 키입니다)

namespace app\command;

use app\common\lib\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;

class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = 'queue';

    protected function configure()
    {
        $this->setName('delay-queue')->setDescription('延迟队列任务进程');
        $this->addArgument(self::COMMAND_ARGV_1);
    }

    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //参数1 延迟队列表名,对应与redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

라이브러리 클래스 디렉터리 구조
PHP+redis는 지연 대기열을 구현합니다.

config.php에는 redis 연결 매개변수 구성이 포함되어 있습니다.

RedisHandler.php는 순서 집합 작업만 구현하며, 재연결 메커니즘이 아직 구현되지 않았습니다.

namespace app\common\lib\delayqueue;

class RedisHandler
{
    public $provider;
    private static $_instance = null;

    private function __construct() {
        $this->provider = new \Redis();
        //host port
        $config = require_once 'config.php';
        $this->provider->connect($config['redis_host'], $config['redis_port']);
    }

    final private function __clone() {}

    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }

    /**
     * 获取有序集数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }

    /**
     * 删除有序集数据
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }

}

Delay queue class

namespace app\common\lib\delayqueue;

class DelayQueue
{

    private $prefix = 'delay_queue:';

    private $queue;

    private static $_instance = null;

    private function __construct($queue) {
        $this->queue = $queue;
    }

    final private function __clone() {}

    public static function getInstance($queue = '') {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     * 添加任务信息到队列
     *
     * demo DelayQueue::getInstance('test')->addTask(
     *    'app\common\lib\delayqueue\job\Test',
     *    strtotime('2018-05-02 20:55:20'),
     *    ['abc'=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 执行时间
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {

        $key = $this->prefix.$this->queue;

        $params = [
            'class' => $jobClass,
            'args'  => $args,
            'runtime' => $runTime,
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     * 执行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一个元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);

        if (!$result) {
            return false;
        }

        $jobInfo = unserialize($result[0]);

        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);

        $jobClass = $jobInfo['class'];

        if(!@class_exists($jobClass)) {
            print_r($jobClass.' undefined'. PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }

        // 到时间执行
        if (time() >= $jobInfo['runtime']) {
            $job = new $jobClass;
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 将任务移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }

        return false;
    }

}

Asynchronous task base class:

namespace app\common\lib\delayqueue;

class DelayJob
{

    protected $payload;

    public function preform ()
    {
        // todo
        return true;
    }


    public function setPayload($args = null)
    {
        $this->payload = $args;
    }

}

All asynchronous 실행된 작업은 모두 작업 디렉터리에서 제거되며 DelayJob을 상속해야 합니다. 실행

예:

namespace app\common\lib\delayqueue\job;

use app\common\lib\delayqueue\DelayJob;

class Test extends DelayJob
{

    public function preform()
    {
        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
        print_r('test job'.PHP_EOL);
        return true;
    }

}

사용법:

사용자가 주문을 생성하고 10분 후에 주문이 만료된다고 가정합니다. 그런 다음 주문이 생성된 후 추가합니다.

DelayQueue::getInstance('close_order')->addTask(
     'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
     strtotime('2018-05-02 20:55:20'), // 订单失效时间
     ['order_id'=>123456] // 传递给job的参数
 );

close_order는 주문된 세트의 키입니다

명령줄 시작 프로세스

php 생각 지연 대기열 close_order

관련 권장 사항:

PHP+redis는 세션 공유를 구현합니다


위 내용은 PHP+redis는 지연 대기열을 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.