延遲佇列,顧名思義它是一種具有延遲功能的訊息佇列。那麼,是在什麼場景下我才需要這樣的隊列呢?
1. 背景
我們先看看以下業務場景:
1.1 解決方案
#最簡單的方式,定時掃表 。例如訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。 優點是簡單,缺點是每分鐘全域掃表,浪費資源,如果遇到表資料訂單量即將過期的訂單量很大,會造成關單延遲。
使用RabbitMq或其他MQ改造實作延遲佇列,優點是,開源,現成的穩定的實作方案,缺點是:MQ是一個訊息中介軟體,如果團隊技術堆疊本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大
#使用Redis的zset、list的特性,我們可以利用redis來實現一個延遲佇列RedisDelayQueue
#2. 設計目標
3.設計方案
設計主要包含以下幾點:3.1 設計圖
還是基於有讚的延遲佇列設計,進行最佳化改造及程式碼實作。有讚設計3.2 資料結構
是一個Hash_Table結構,裡面儲存了所有延遲佇列的資訊。 KV結構:K=prefix projectName field = topic jobId V=CONENT;V由客戶端傳入的數據,消費的時候回傳
延遲隊列的有序集合ZSET,存放K=ID和需要的執行時間戳,根據時間戳排序
LIST結構,每個Topic一個LIST,list存放的都是目前需要被消費的JOB
圖片僅供參考,基本上可以描述整個流程的執行過程,圖片源自於文末的參考部落格中
3.3 任務的生命週期
中插入一條數據,記錄了業務方消費方。
ZING:DELAY_QUEUE:BUCKET也會插入一筆記錄,記錄執行的時間戳
中尋找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什麼,然後將這些任務PUSH到TOPIC對應的清單
ZING:DELAY_QUEUE:QUEUE
尋找資料結構,傳回回呼結構,執行回呼方法。
3.4 設計要點#
3.4.1 基本概念
3.4.2 訊息結構
每個JOB必須包含以下幾個屬性
3.5 設計細節
3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE
#最簡單的實作方式就是使用計時器進行秒數掃描,為了確保訊息執行的時效性,可以設定每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語
,如果list中有資料就會立刻返回,如果沒有資料就會一直阻塞在那裡,直到有資料回傳,可以設定阻塞的超時時間,超時會傳回NULL;具體的實作方式及策略會在程式碼中進行具體的實作介紹
3.5.2 避免定時導致的訊息重複搬運及消費
4.核心程式碼實作
4.1 技術說明
#技術堆疊:SpringBoot,Redisson,Redis,分散式鎖,計時器
#注意:本專案沒有實現設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以後優化
4.2 核心實體
#4.2.1 Job新增物件
/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class Job implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; /** * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间) */ private Long delay; /** * Job的内容,供消费者做具体的业务处理,以json格式存储 */ @NotBlank private String body; /** * 失败重试次数 */ private int retry = 0; /** * 通知URL */ @NotBlank private String url; }
4.2.2 Job刪除物件
#/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class JobDie implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; }
4.3 搬運執行緒
#/** * 搬运线程 * * @author 睁眼看世界 * @date 2020年1月17日 */ @Slf4j @Component public class CarryJobScheduled { @Autowired private RedissonClient redissonClient; /** * 启动定时开启搬运JOB信息 */ @Scheduled(cron = "*/1 * * * * *") public void carryJobToQueue() { System.out.println("carryJobToQueue --->"); RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE); long now = System.currentTimeMillis(); Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true); List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList()); RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE); readyQueue.addAll(jobList); bucketSet.removeAllAsync(jobList); } catch (InterruptedException e) { log.error("carryJobToQueue error", e); } finally { if (lock != null) { lock.unlock(); } } } }</string></string></object></object>
4.4 消費執行緒
#@Slf4j @Component public class ReadyQueueContext { @Autowired private RedissonClient redissonClient; @Autowired private ConsumerService consumerService; /** * TOPIC消费线程 */ @PostConstruct public void startTopicConsumer() { TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程"); } /** * 开启TOPIC消费线程 * 将所有可能出现的异常全部catch住,确保While(true)能够不中断 */ @SuppressWarnings("InfiniteLoopStatement") private void runTopicThreads() { while (true) { RLock lock = null; try { lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK); } catch (Exception e) { log.error("runTopicThreads getLock error", e); } try { if (lock == null) { continue; } // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { continue; } // 1. 获取ReadyQueue中待消费的数据 RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE); String topicId = queue.poll(60, TimeUnit.SECONDS); if (StringUtils.isEmpty(topicId)) { continue; } // 2. 获取job元信息内容 RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY); Job job = jobPoolMap.get(topicId); // 3. 消费 FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId()); if (taskResult.get()) { // 3.1 消费成功,删除JobPool和DelayBucket的job信息 jobPoolMap.remove(topicId); } else { int retrySum = job.getRetry() + 1; // 3.2 消费失败,则根据策略重新加入Bucket // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) { jobPoolMap.remove(topicId); continue; } job.setRetry(retrySum); long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000; log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime)); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(nextTime, topicId); // 3.3 更新元信息失败次数 jobPoolMap.put(topicId, job); } } catch (Exception e) { log.error("runTopicThreads error", e); } finally { if (lock != null) { try { lock.unlock(); } catch (Exception e) { log.error("runTopicThreads unlock error", e); } } } } } }</object></boolean></string></string>
4.5 新增及刪除JOB
/** * 提供给外部服务的操作接口 * * @author why * @date 2020年1月15日 */ @Slf4j @Service public class RedisDelayQueueServiceImpl implements RedisDelayQueueService { @Autowired private RedissonClient redissonClient; /** * 添加job元信息 * * @param job 元信息 */ @Override public void addJob(Job job) { RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId()); // 1. 将job添加到 JobPool中 RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); if (jobPool.get(topicId) != null) { throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST); } jobPool.put(topicId, job); // 2. 将job添加到 DelayBucket中 RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(job.getDelay(), topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } /** * 删除job信息 * * @param job 元信息 */ @Override public void deleteJob(JobDie jobDie) { RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId()); RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); jobPool.remove(topicId); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.remove(topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } }</object></string></object></string>
#5. 待最佳化的內容
6. 原始碼
#更多詳細原始碼請在下面位址中取得
RedisDelayQueue實作
zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)專案應用程式
zing-pay(https://gitee.com/whyCodeData/zing-pay)#7.參考
以上是Redis如何實現延遲隊列?方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!