Maison >développement back-end >tutoriel php >PHP+redis implémente la file d'attente de retard

PHP+redis implémente la file d'attente de retard

不言
不言original
2018-05-07 10:41:1511421parcourir

Cet article présente principalement l'implémentation de la file d'attente retardée avec php+redis, qui a une certaine valeur de référence. Maintenant, je le partage avec tout le monde. Les amis dans le besoin peuvent s'y référer

Implémentation d'une exécution de tâche retardée basée sur redis. ensemble ordonné, comme l'envoi de messages texte à un certain utilisateur à une certaine heure, la gestion de l'expiration des commandes, etc.
Je l'ai écrit sur le framework tp5 C'est très simple à mettre en œuvre. C'est suffisant pour certaines applications pas très complexes. . C'est actuellement dans les projets de l'entreprise Utilisation, le processus en arrière-plan n'implémente pas le multi-processus,
Je ne dirai pas grand chose, je posterai juste le code, et je ne reviendrai pas sur la composition, pardonnez-moi. moi

1. Méthode d'exécution du script de ligne de commande : php think delay-queue queuename (il s'agit d'une clé de réglage ordonnée)

namespace app\command;

use app\common\lib\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;

class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = 'queue';

    protected function configure()
    {
        $this->setName('delay-queue')->setDescription('延迟队列任务进程');
        $this->addArgument(self::COMMAND_ARGV_1);
    }

    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //参数1 延迟队列表名,对应与redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

Structure du répertoire de la bibliothèque
PHP+redis implémente la file dattente de retard

config.php est la configuration des paramètres de connexion Redis

RedisHandler.php n'implémente que les opérations d'ensemble ordonnées, et le mécanisme de reconnexion n'a pas encore été implémenté

namespace app\common\lib\delayqueue;

class RedisHandler
{
    public $provider;
    private static $_instance = null;

    private function __construct() {
        $this->provider = new \Redis();
        //host port
        $config = require_once 'config.php';
        $this->provider->connect($config['redis_host'], $config['redis_port']);
    }

    final private function __clone() {}

    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }

    /**
     * 获取有序集数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }

    /**
     * 删除有序集数据
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }

}

Classe de file d'attente différée

namespace app\common\lib\delayqueue;

class DelayQueue
{

    private $prefix = 'delay_queue:';

    private $queue;

    private static $_instance = null;

    private function __construct($queue) {
        $this->queue = $queue;
    }

    final private function __clone() {}

    public static function getInstance($queue = '') {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     * 添加任务信息到队列
     *
     * demo DelayQueue::getInstance('test')->addTask(
     *    'app\common\lib\delayqueue\job\Test',
     *    strtotime('2018-05-02 20:55:20'),
     *    ['abc'=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 执行时间
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {

        $key = $this->prefix.$this->queue;

        $params = [
            'class' => $jobClass,
            'args'  => $args,
            'runtime' => $runTime,
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     * 执行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一个元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);

        if (!$result) {
            return false;
        }

        $jobInfo = unserialize($result[0]);

        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);

        $jobClass = $jobInfo['class'];

        if(!@class_exists($jobClass)) {
            print_r($jobClass.' undefined'. PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }

        // 到时间执行
        if (time() >= $jobInfo['runtime']) {
            $job = new $jobClass;
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 将任务移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }

        return false;
    }

}

Classe de base de tâches asynchrones :

namespace app\common\lib\delayqueue;

class DelayJob
{

    protected $payload;

    public function preform ()
    {
        // todo
        return true;
    }


    public function setPayload($args = null)
    {
        $this->payload = $args;
    }

}

Toutes les tâches exécutées de manière asynchrone sont désinstallées dans le répertoire des tâches et doivent hériter de DelayJob. Vous pouvez implémenter n'importe quelle tâche que vous souhaitez retarder. exécution

Par exemple :

namespace app\common\lib\delayqueue\job;

use app\common\lib\delayqueue\DelayJob;

class Test extends DelayJob
{

    public function preform()
    {
        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
        print_r('test job'.PHP_EOL);
        return true;
    }

}

Utilisation :

Supposons que l'utilisateur crée une commande et que la commande expire après 10 minutes, puis ajoute une fois la commande créée :

DelayQueue::getInstance('close_order')->addTask(
     'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
     strtotime('2018-05-02 20:55:20'), // 订单失效时间
     ['order_id'=>123456] // 传递给job的参数
 );

close_order est la clé de l'ensemble ordonné

Processus de démarrage en ligne de commande

php think delay-queue close_order

Recommandations associées :

PHP+redis implémente le partage de session


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn