搜尋
首頁後端開發php教程php+redis實作延遲隊列

php+redis實作延遲隊列

May 07, 2018 am 10:41 AM
php佇列

這篇文章主要介紹了關於php redis實現延遲隊列,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下

基於redis有序集實現延遲任務執行,例如某個時間給某個用戶發短信,訂單過期處理,等等
我是在tp5框架上寫的,實現起來很簡單,對於一些不是很複雜的應用足夠了,目前在公司項目中使用,後台進程並沒有實現多進程,
不多說,貼程式碼,不回排版,見諒

1、命令列腳本執行方法:php think delay-queue queuename(這是有序集的key)

namespace app\command;

use app\common\lib\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;

class DelayQueueWorker extends Command
{
    const COMMAND_ARGV_1 = 'queue';

    protected function configure()
    {
        $this->setName('delay-queue')->setDescription('延迟队列任务进程');
        $this->addArgument(self::COMMAND_ARGV_1);
    }

    protected function execute(Input $input, Output $output)
    {
        $queue = $input->getArgument(self::COMMAND_ARGV_1);
        //参数1 延迟队列表名,对应与redis的有序集key名
        while (true) {
            DelayQueue::getInstance($queue)->perform();
            usleep(300000);
        }
    }
}

庫類別目錄結構
php+redis實作延遲隊列

#config.php 裡是redis連接參數配置

RedisHandler.php只實現有序集的操作,重連機制還沒有實現

namespace app\common\lib\delayqueue;

class RedisHandler
{
    public $provider;
    private static $_instance = null;

    private function __construct() {
        $this->provider = new \Redis();
        //host port
        $config = require_once 'config.php';
        $this->provider->connect($config['redis_host'], $config['redis_port']);
    }

    final private function __clone() {}

    public static function getInstance() {
        if(!self::$_instance) {
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key, $score, $value)
    {
        return $this->provider->zAdd($key, $score, $value);
    }

    /**
     * 获取有序集数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withscores
     * @return array
     */
    public function zRange($key, $start, $end, $withscores = null)
    {
        return $this->provider->zRange($key, $start, $end, $withscores);
    }

    /**
     * 删除有序集数据
     * @param $key
     * @param $member
     * @return int
     */
    public function zRem($key,$member)
    {
        return $this->provider->zRem($key,$member);
    }

}

延遲隊列類別

namespace app\common\lib\delayqueue;

class DelayQueue
{

    private $prefix = 'delay_queue:';

    private $queue;

    private static $_instance = null;

    private function __construct($queue) {
        $this->queue = $queue;
    }

    final private function __clone() {}

    public static function getInstance($queue = '') {
        if(!self::$_instance) {
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     * 添加任务信息到队列
     *
     * demo DelayQueue::getInstance('test')->addTask(
     *    'app\common\lib\delayqueue\job\Test',
     *    strtotime('2018-05-02 20:55:20'),
     *    ['abc'=>111]
     * );
     *
     * @param $jobClass
     * @param int $runTime 执行时间
     * @param array $args
     */
    public function addTask($jobClass, $runTime, $args = null)
    {

        $key = $this->prefix.$this->queue;

        $params = [
            'class' => $jobClass,
            'args'  => $args,
            'runtime' => $runTime,
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     * 执行job
     * @return bool
     */
    public function perform()
    {
        $key = $this->prefix.$this->queue;
        //取出有序集第一个元素
        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);

        if (!$result) {
            return false;
        }

        $jobInfo = unserialize($result[0]);

        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);

        $jobClass = $jobInfo['class'];

        if(!@class_exists($jobClass)) {
            print_r($jobClass.' undefined'. PHP_EOL);
            RedisHandler::getInstance()->zRem($key, $result[0]);
            return false;
        }

        // 到时间执行
        if (time() >= $jobInfo['runtime']) {
            $job = new $jobClass;
            $job->setPayload($jobInfo['args']);
            $jobResult = $job->preform();
            if ($jobResult) {
                // 将任务移除
                RedisHandler::getInstance()->zRem($key, $result[0]);
                return true;
            }
        }

        return false;
    }

}

非同步任務基類:

namespace app\common\lib\delayqueue;

class DelayJob
{

    protected $payload;

    public function preform ()
    {
        // todo
        return true;
    }


    public function setPayload($args = null)
    {
        $this->payload = $args;
    }

}

所有非同步執行的任務都卸載job目錄下,且要繼承DelayJob,你可以實現任何你想延遲執行的任務

如:

namespace app\common\lib\delayqueue\job;

use app\common\lib\delayqueue\DelayJob;

class Test extends DelayJob
{

    public function preform()
    {
        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入
        print_r('test job'.PHP_EOL);
        return true;
    }

}

使用方法:

假設用戶創建了一個訂單,訂單在10分鐘後失效,那麼在訂單建立後加入:

DelayQueue::getInstance('close_order')->addTask(
     'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job
     strtotime('2018-05-02 20:55:20'), // 订单失效时间
     ['order_id'=>123456] // 传递给job的参数
 );

close_order 是有序集的key

命令列啟動程序

php think delay-queue close_order

相關推薦:

PHP redis實作session共享


以上是php+redis實作延遲隊列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
使用數據庫存儲會話的優點是什麼?使用數據庫存儲會話的優點是什麼?Apr 24, 2025 am 12:16 AM

使用數據庫存儲會話的主要優勢包括持久性、可擴展性和安全性。 1.持久性:即使服務器重啟,會話數據也能保持不變。 2.可擴展性:適用於分佈式系統,確保會話數據在多服務器間同步。 3.安全性:數據庫提供加密存儲,保護敏感信息。

您如何在PHP中實現自定義會話處理?您如何在PHP中實現自定義會話處理?Apr 24, 2025 am 12:16 AM

在PHP中實現自定義會話處理可以通過實現SessionHandlerInterface接口來完成。具體步驟包括:1)創建實現SessionHandlerInterface的類,如CustomSessionHandler;2)重寫接口中的方法(如open,close,read,write,destroy,gc)來定義會話數據的生命週期和存儲方式;3)在PHP腳本中註冊自定義會話處理器並啟動會話。這樣可以將數據存儲在MySQL、Redis等介質中,提升性能、安全性和可擴展性。

什麼是會話ID?什麼是會話ID?Apr 24, 2025 am 12:13 AM

SessionID是網絡應用程序中用來跟踪用戶會話狀態的機制。 1.它是一個隨機生成的字符串,用於在用戶與服務器之間的多次交互中保持用戶的身份信息。 2.服務器生成並通過cookie或URL參數發送給客戶端,幫助在用戶的多次請求中識別和關聯這些請求。 3.生成通常使用隨機算法保證唯一性和不可預測性。 4.在實際開發中,可以使用內存數據庫如Redis來存儲session數據,提升性能和安全性。

您如何在無狀態環境(例如API)中處理會議?您如何在無狀態環境(例如API)中處理會議?Apr 24, 2025 am 12:12 AM

在無狀態環境如API中管理會話可以通過使用JWT或cookies來實現。 1.JWT適合無狀態和可擴展性,但大數據時體積大。 2.Cookies更傳統且易實現,但需謹慎配置以確保安全性。

您如何防止與會議有關的跨站點腳本(XSS)攻擊?您如何防止與會議有關的跨站點腳本(XSS)攻擊?Apr 23, 2025 am 12:16 AM

要保護應用免受與會話相關的XSS攻擊,需採取以下措施:1.設置HttpOnly和Secure標誌保護會話cookie。 2.對所有用戶輸入進行輸出編碼。 3.實施內容安全策略(CSP)限制腳本來源。通過這些策略,可以有效防護會話相關的XSS攻擊,確保用戶數據安全。

您如何優化PHP會話性能?您如何優化PHP會話性能?Apr 23, 2025 am 12:13 AM

优化PHP会话性能的方法包括:1.延迟会话启动,2.使用数据库存储会话,3.压缩会话数据,4.管理会话生命周期,5.实现会话共享。这些策略能显著提升应用在高并发环境下的效率。

什麼是session.gc_maxlifetime配置設置?什麼是session.gc_maxlifetime配置設置?Apr 23, 2025 am 12:10 AM

theSession.gc_maxlifetimesettinginphpdeterminesthelifespanofsessiondata,setInSeconds.1)它'sconfiguredinphp.iniorviaini_set().2)abalanceisesneededeededeedeedeededto toavoidperformance andunununununexpectedLogOgouts.3)

您如何在PHP中配置會話名?您如何在PHP中配置會話名?Apr 23, 2025 am 12:08 AM

在PHP中,可以使用session_name()函數配置會話名稱。具體步驟如下:1.使用session_name()函數設置會話名稱,例如session_name("my_session")。 2.在設置會話名稱後,調用session_start()啟動會話。配置會話名稱可以避免多應用間的會話數據衝突,並增強安全性,但需注意會話名稱的唯一性、安全性、長度和設置時機。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

Atom編輯器mac版下載

Atom編輯器mac版下載

最受歡迎的的開源編輯器

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。