지연 대기열은 이름에서 알 수 있듯이 지연 기능이 있는 메시지 대기열입니다. 그렇다면 어떤 상황에서 이러한 대기열이 필요합니까?
1. 배경
먼저 다음 비즈니스 시나리오를 살펴보겠습니다.
1.1 솔루션
가장 간단한 방법은 정기적으로 측정기를 스캔하는 것입니다. 예를 들어, 주문 결제 만료 요구 사항이 상대적으로 높은 경우 만료된 주문을 확인하고 적극적으로 주문을 마감하기 위해 측정기를 2초마다 스캔합니다. 단점은 간단하다는 점, 단점은 1분마다 테이블을 전역으로 스캔하므로 리소스가 낭비된다는 점. 테이블 데이터의 주문량이 곧 만료될 경우 주문이 지연될 수 있습니다. 폐쇄.
RabbitMq 또는 기타 MQ 수정을 사용하여 지연 대기열을 구현하세요. 장점은 오픈 소스이며 이미 만들어진 안정적인 구현 솔루션이라는 것입니다. 단점은 다음과 같습니다. MQ는 팀 기술 스택이 이미 가지고 있는 경우입니다. MQ라면 괜찮습니다. 지연 대기열을 위한 MQ 배포 비용은 약간 높습니다
Redis의 zset 및 list 기능을 사용하면 redis를 사용하여 지연 대기열을 구현할 수 있습니다RedisDelayQueue
2. 설계 목표
3. 설계 계획
설계에는 주로 다음 사항이 포함됩니다.
3.1 디자인 다이어그램
여전히 좋아하는 지연 대기열을 기반으로 코드를 디자인, 최적화 및 구현합니다. Youzan Design
3.2 데이터 구조
ZING:DELAY_QUEUE:JOB_POOL
은 모든 지연 대기열 정보를 저장하는 Hash_Table 구조입니다. KV 구조: K=prefix+projectName 필드 = topic+jobId V=CONENT;V클라이언트에서 전달한 데이터는 소비 시 반환됩니다.ZING:DELAY_QUEUE:JOB_POOL
是一个Hash_Table结构,里面存储了所有延迟队列的信息。KV结构:K=prefix+projectName field = topic+jobId V=CONENT;V由客户端传入的数据,消费的时候回传ZING:DELAY_QUEUE:BUCKET
延迟队列的有序集合ZSET,存放K=ID和需要的执行时间戳,根据时间戳排序ZING:DELAY_QUEUE:QUEUE
LIST结构,每个Topic一个LIST,list存放的都是当前需要被消费的JOB
Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개
3.3 任务的生命周期
ZING:DELAY_QUEUE:JOB_POOL
中插入一条数据,记录了业务方消费方。ZING:DELAY_QUEUE:BUCKET
也会插入一条记录,记录执行的时间戳ZING:DELAY_QUEUE:BUCKET
中查找哪些执行时间戳的RunTimeMillis比现在的时间小,将这些记录全部删除;同时会解析出每个任务的Topic是什么,然后将这些任务PUSH到TOPIC对应的列表ZING:DELAY_QUEUE:QUEUE
中ZING:DELAY_QUEUE:JOB_POOL
ZING:DELAY_QUEUE:BUCKET
지연 대기열 순서 ZSET 설정, K 저장 =ID 및 필요한 실행 타임스탬프(타임스탬프ZING:DELAY_QUEUE:QUEUE
LIST 구조에 따라 정렬됨), 각 주제에는 LIST가 있으며 목록에는 현재 소비해야 하는 항목이 저장됩니다. JOB 사진은 참고용일 뿐이며, 기본적으로 전체 프로세스의 실행 과정을 설명할 수 있으며, 사진은 기사 끝 부분의 참조 블로그에서 가져왔습니다. Article
ZING:DELAY_QUEUE:JOB_POOL
에 데이터 조각이 삽입되어 비즈니스 측면과 소비자를 기록합니다. 옆. ZING:DELAY_QUEUE:BUCKET
은 실행 타임스탬프를 기록하기 위해 레코드도 삽입합니다🎜🎜처리 스레드는 ZING:DELAY_QUEUE:BUCKET
으로 이동하여 RunTimeMillis가 있는 실행 타임스탬프를 찾습니다. ratio 이제 시간이 부족하므로 이러한 레코드를 모두 동시에 삭제하면 각 작업의 주제가 무엇인지 분석한 다음 이러한 작업을 TOPIC ZING:DELAY_QUEUE:QUEUE
에 해당하는 목록에 푸시합니다. >🎜🎜각 TOPIC의 LIST에는 LIST에서 일괄적으로 소비할 데이터를 얻기 위한 청취 스레드가 있으며, 획득한 모든 데이터는 이 TOPIC의 소비 스레드 풀로 전달됩니다.🎜🎜소비 스레드 풀 실행은 다음으로 이동합니다. ZING:DELAY_QUEUE:JOB_POOL
데이터 구조를 찾아 콜백 구조에 반환하고 콜백 메서드를 실행합니다. 🎜🎜🎜🎜🎜3.4 디자인 포인트🎜🎜🎜3.4.1 기본 개념
3.4.2 메시지 구조
각 JOB에는 다음 속성이 포함되어야 합니다
3.5 디자인 세부사항
3.5.1 빠르게 소비하는 방법ZING:DELAY_QUEUE:QUEUE
ZING:DELAY_QUEUE:QUEUE
最简单的实现方式就是使用定时器进行秒级扫描,为了保证消息执行的时效性,可以设置每1S请求Redis一次,判断队列中是否有待消费的JOB。但是这样会存在一个问题,如果queue中一直没有可消费的JOB,那频繁的扫描就失去了意义,也浪费了资源,幸好LIST中有一个BLPOP阻塞原语
,如果list中有数据就会立马返回,如果没有数据就会一直阻塞在那里,直到有数据返回,可以设置阻塞的超时时间,超时会返回NULL;具体的实现方式及策略会在代码中进行具体的实现介绍
3.5.2 避免定时导致的消息重复搬运及消费
4. 核心代码实现
4.1 技术说明
技术栈:SpringBoot,Redisson,Redis,分布式锁,定时器
注意:本项目没有实现设计方案中的多Queue消费,只开启了一个QUEUE,这个待以后优化
4.2 核心实体
4.2.1 Job新增对象
/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class Job implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; /** * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间) */ private Long delay; /** * Job的内容,供消费者做具体的业务处理,以json格式存储 */ @NotBlank private String body; /** * 失败重试次数 */ private int retry = 0; /** * 通知URL */ @NotBlank private String url; }
4.2.2 Job删除对象
/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class JobDie implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; }
4.3 搬运线程
/** * 搬运线程 * * @author 睁眼看世界 * @date 2020年1月17日 */ @Slf4j @Component public class CarryJobScheduled { @Autowired private RedissonClient redissonClient; /** * 启动定时开启搬运JOB信息 */ @Scheduled(cron = "*/1 * * * * *") public void carryJobToQueue() { System.out.println("carryJobToQueue --->"); RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE); long now = System.currentTimeMillis(); Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true); List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList()); RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE); readyQueue.addAll(jobList); bucketSet.removeAllAsync(jobList); } catch (InterruptedException e) { log.error("carryJobToQueue error", e); } finally { if (lock != null) { lock.unlock(); } } } }</string></string></object></object>
4.4 消费线程
@Slf4j @Component public class ReadyQueueContext { @Autowired private RedissonClient redissonClient; @Autowired private ConsumerService consumerService; /** * TOPIC消费线程 */ @PostConstruct public void startTopicConsumer() { TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程"); } /** * 开启TOPIC消费线程 * 将所有可能出现的异常全部catch住,确保While(true)能够不中断 */ @SuppressWarnings("InfiniteLoopStatement") private void runTopicThreads() { while (true) { RLock lock = null; try { lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK); } catch (Exception e) { log.error("runTopicThreads getLock error", e); } try { if (lock == null) { continue; } // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { continue; } // 1. 获取ReadyQueue中待消费的数据 RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE); String topicId = queue.poll(60, TimeUnit.SECONDS); if (StringUtils.isEmpty(topicId)) { continue; } // 2. 获取job元信息内容 RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY); Job job = jobPoolMap.get(topicId); // 3. 消费 FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId()); if (taskResult.get()) { // 3.1 消费成功,删除JobPool和DelayBucket的job信息 jobPoolMap.remove(topicId); } else { int retrySum = job.getRetry() + 1; // 3.2 消费失败,则根据策略重新加入Bucket // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) { jobPoolMap.remove(topicId); continue; } job.setRetry(retrySum); long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000; log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime)); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(nextTime, topicId); // 3.3 更新元信息失败次数 jobPoolMap.put(topicId, job); } } catch (Exception e) { log.error("runTopicThreads error", e); } finally { if (lock != null) { try { lock.unlock(); } catch (Exception e) { log.error("runTopicThreads unlock error", e); } } } } } }</object></boolean></string></string>
4.5 添加及删除JOB
/** * 提供给外部服务的操作接口 * * @author why * @date 2020年1月15日 */ @Slf4j @Service public class RedisDelayQueueServiceImpl implements RedisDelayQueueService { @Autowired private RedissonClient redissonClient; /** * 添加job元信息 * * @param job 元信息 */ @Override public void addJob(Job job) { RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId()); // 1. 将job添加到 JobPool中 RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); if (jobPool.get(topicId) != null) { throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST); } jobPool.put(topicId, job); // 2. 将job添加到 DelayBucket中 RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(job.getDelay(), topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } /** * 删除job信息 * * @param job 元信息 */ @Override public void deleteJob(JobDie jobDie) { RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId()); RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); jobPool.remove(topicId); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.remove(topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } }</object></string></object></string>
5. 待优化的内容
6. 源码
更多详细源码请在下面地址中获取
RedisDelayQueue实现
zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)项目应用
구현하는 가장 간단한 방법은 타이머를 사용하여 스캔하는 것입니다. 두 번째 수준에서는 메시지 실행의 적시성을 보장하기 위해 1S마다 Redis에 요청을 설정하여 대기열에서 사용할 JOB이 있는지 확인할 수 있습니다. 하지만 문제가 발생합니다. 대기열에 소비 가능한 JOB이 없으면 잦은 검색은 의미가 없으며 다행히 LIST에 BLPOP 차단 프리미티브
가 있습니다. 데이터가 있으면 즉시 반환되고, 데이터가 없으면 데이터가 반환될 때까지 차단되며, 차단 시간 초과를 설정할 수 있으며, 특정 구현 방법 및 전략은 NULL을 반환합니다. 3.5.2 타이밍으로 인한 메시지의 반복 처리 및 소비 방지
RedisDelayQueue 구현
zing-delay-queue (https://gitee.com/whyCodeData/)에서 확인하세요. zing -project/tree/master/zing-delay-queue)🎜🎜RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/) zing -starter/redisson-spring-boot-starter)🎜🎜프로젝트 애플리케이션
zing-pay (https://gitee.com/whyCodeData/zing-pay)🎜🎜🎜🎜🎜7. 🎜 🎜🎜🎜https://tech.youzan.com/queuing_delay/🎜🎜https://blog.csdn.net/u010634066/article/details/98864764🎜🎜🎜더 많은 Redis 지식을 보려면 다음 항목에 주의하세요. 🎜redis 입문 튜토리얼🎜 칼럼. 🎜위 내용은 Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!