首頁 >資料庫 >Redis >Redis如何實現延遲隊列?方法介紹

Redis如何實現延遲隊列?方法介紹

青灯夜游
青灯夜游轉載
2020-07-08 16:01:583102瀏覽

Redis如何實現延遲隊列?方法介紹

延遲佇列,顧名思義它是一種具有延遲功能的訊息佇列。那麼,是在什麼場景下我才需要這樣的隊列呢?

1. 背景

我們先看看以下業務場景:

  • 當訂單一直處於未支付狀態時,如何及時的關閉訂單
  • 如何定期檢查處於退款狀態的訂單是否已經退款成功
  • 在訂單長時間沒有收到下游系統的狀態通知的時候,如何實現階梯式的同步訂單狀態的策略
  • 在系統通知上游系統支付成功終態時,上游系統返回通知失敗,如何進行非同步通知實行分頻率發送:15s 3m 10m 30m 30m 1h 2h 6h 15h

1.1 解決方案

  • #最簡單的方式,定時掃表 。例如訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。 優點是簡單缺點是每分鐘全域掃表,浪費資源,如果遇到表資料訂單量即將過期的訂單量很大,會造成關單延遲。

  • 使用RabbitMq或其他MQ改造實作延遲佇列,優點是,開源,現成的穩定的實作方案,缺點是:MQ是一個訊息中介軟體,如果團隊技術堆疊本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大

  • #使用Redis的zset、list的特性,我們可以利用redis來實現一個延遲佇列RedisDelayQueue

#2. 設計目標

    # 即時性:允許存在一定時間的秒級誤差
  • 高可用性:支援單機、支援叢集
  • 支援訊息刪除:業務會隨時刪除指定訊息
  • 訊息可靠性:保證至少被消費一次
  • 訊息持久化:基於Redis本身的持久化特性,如果Redis資料遺失,意味著延遲訊息的遺失,不過可以做主備和叢集保證。這個可以考慮後續最佳化將訊息持久化到MangoDB中

3.設計方案

設計主要包含以下幾點:

    將整個Redis當做訊息池,以KV形式儲存訊息
  • 使用ZSET做優先權佇列,依照Score維持優先權
  • 使用LIST結構,以先進先出的方式消費
  • ZSET和LIST儲存訊息地址(對應訊息池的每個KEY)
  • 自訂路由對象,儲存ZSET和LIST名稱,以點對點的方式將訊息從ZSET路由到正確的LIST
  • 使用定時器維護路由
  • 根據TTL規則實作訊息延遲

3.1 設計圖

還是基於有讚的延遲佇列設計,進行最佳化改造及程式碼實作。有讚設計


Redis如何實現延遲隊列?方法介紹

3.2 資料結構

  • ZING:DELAY_QUEUE:JOB_POOL是一個Hash_Table結構,裡面儲存了所有延遲佇列的資訊。 KV結構:K=prefix projectName  field = topic jobId  V=CONENT;V由客戶端傳入的數據,消費的時候回傳
  • ZING:DELAY_QUEUE:BUCKET 延遲隊列的有序集合ZSET,存放K=ID和需要的執行時間戳,根據時間戳排序
  • ZING:DELAY_QUEUE:QUEUE LIST結構,每個Topic一個LIST,list存放的都是目前需要被消費的JOB

Redis如何實現延遲隊列?方法介紹圖片僅供參考,基本上可以描述整個流程的執行過程,圖片源自於文末的參考部落格中

3.3 任務的生命週期

    新增一個JOB,會在
  1. ZING:DELAY_QUEUE:JOB_POOL中插入一條數據,記錄了業務方消費方。 ZING:DELAY_QUEUE:BUCKET也會插入一筆記錄,記錄執行的時間戳
  2. 搬運執行緒會去
  3. ZING:DELAY_QUEUE:BUCKET中尋找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什麼,然後將這些任務PUSH到TOPIC對應的清單ZING:DELAY_QUEUE:QUEUE
  4. #每個TOPIC的LIST都會有一個監聽線程去批量獲取LIST中的待消費數據,獲取到的數據全部扔給這個TOPIC的消費線程池
  5. 消費線程池執行會去
  6. ZING:DELAY_QUEUE:JOB_POOL尋找資料結構,傳回回呼結構,執行回呼方法。

3.4 設計要點#

3.4.1 基本概念

  • JOB:需要非同步處理的任務,是延遲佇列裡的基本單元
  • Topic:一組相同類型Job的集合(佇列)。供消費者訂閱

3.4.2 訊息結構

每個JOB必須包含以下幾個屬性

  • jobId:Job的唯一識別。用來檢索和刪除指定的Job資訊
  • topic:Job類型。可以理解成具體的業務名稱
  • delay:Job需要延遲的時間。單位:秒。 (服務端會將其轉換為絕對時間)
  • body:Job的內容,供消費者做具體的業務處理,以json格式儲存
  • retry:失敗重試次數
  • url:通知URL

3.5 設計細節

3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE

#最簡單的實作方式就是使用計時器進行秒數掃描,為了確保訊息執行的時效性,可以設定每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語,如果list中有資料就會立刻返回,如果沒有資料就會一直阻塞在那裡,直到有資料回傳,可以設定阻塞的超時時間,超時會傳回NULL;具體的實作方式及策略會在程式碼中進行具體的實作介紹

3.5.2 避免定時導致的訊息重複搬運及消費

  • 使用Redis的分散式鎖定來控制訊息的搬運,從而避免訊息被重複搬運導致的問題
  • 使用分散式鎖定來保證計時器的執行頻率

4.核心程式碼實作

4.1 技術說明

#技術堆疊:SpringBoot,Redisson,Redis,分散式鎖,計時器

#注意:本專案沒有實現設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以後優化

4.2 核心實體

#4.2.1 Job新增物件

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
     */
    private Long delay;

    /**
     * Job的内容,供消费者做具体的业务处理,以json格式存储
     */
    @NotBlank
    private String body;

    /**
     * 失败重试次数
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}

4.2.2 Job刪除物件

#
/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;
}

4.3 搬運執行緒

#
/**
 * 搬运线程
 *
 * @author 睁眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 启动定时开启搬运JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
            List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</string></string></object></object>

4.4 消費執行緒

#
@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消费线程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
    }

    /**
     * 开启TOPIC消费线程
     * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 获取ReadyQueue中待消费的数据
                RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 获取job元信息内容
                RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消费
                FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消费成功,删除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消费失败,则根据策略重新加入Bucket

                    // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失败次数
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}</object></boolean></string></string>

4.5 新增及刪除JOB

/**
 * 提供给外部服务的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 将job添加到 JobPool中
            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 将job添加到 DelayBucket中
            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 删除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</object></string></object></string>

#5. 待最佳化的內容

  1. 目前只有一個Queue隊列存放訊息,當需要消費的消息大量堆積後,會影響訊息通知的時效。改進的辦法是,開啟多個Queue,進行訊息路由,再開啟多個消費執行緒進行消費,提供吞吐量
  2. 訊息沒有持久化,存在風險,後續會將訊息持久化到MangoDB中

6. 原始碼

#更多詳細原始碼請在下面位址中取得

  • RedisDelayQueue實作 zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)
  • RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)
  • 專案應用程式 zing-pay(https://gitee.com/whyCodeData/zing-pay)

#7.參考

################################################################################################### ###https://tech.youzan.com/queuing_delay/######https://blog.csdn.net/u010634066/article/details/98864764##########更多redis知識,請關注:###redis入門教學###欄位。 ###

以上是Redis如何實現延遲隊列?方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:csdn.net。如有侵權,請聯絡admin@php.cn刪除