>  기사  >  데이터 베이스  >  Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개

Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개

青灯夜游
青灯夜游앞으로
2020-07-08 16:01:583060검색

Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개

지연 대기열은 이름에서 알 수 있듯이 지연 기능이 있는 메시지 대기열입니다. 그렇다면 어떤 상황에서 이러한 대기열이 필요합니까?

1. 배경

먼저 다음 비즈니스 시나리오를 살펴보겠습니다.

  • 주문이 미결제 상태일 때 적시에 주문을 종료하는 방법
  • 주문 여부를 정기적으로 확인하는 방법 환불 상태가 환불되었습니다. 결제가 성공했습니다
  • 주문이 다운스트림 시스템에서 오랫동안 상태 알림을 받지 못한 경우, 주문 상태를 동기화하는 단계별 전략을 구현하는 방법
  • 시스템에서 알림을 받을 때 최종 결제 성공 상태의 업스트림 시스템에서 알림 실패를 반환하는 경우 진행 방법은 무엇입니까? 비동기 알림은 15초 3분 10분 30분 30분 1시간 2시간 6시간 15시간

1.1 솔루션

  • 가장 간단한 방법은 정기적으로 측정기를 스캔하는 것입니다. 예를 들어, 주문 결제 만료 요구 사항이 상대적으로 높은 경우 만료된 주문을 확인하고 적극적으로 주문을 마감하기 위해 측정기를 2초마다 스캔합니다. 단점은 간단하다는 점, 단점은 1분마다 테이블을 전역으로 스캔하므로 리소스가 낭비된다는 점. 테이블 데이터의 주문량이 곧 만료될 경우 주문이 지연될 수 있습니다. 폐쇄.

  • RabbitMq 또는 기타 MQ 수정을 사용하여 지연 대기열을 구현하세요. 장점은 오픈 소스이며 이미 만들어진 안정적인 구현 솔루션이라는 것입니다. 단점은 다음과 같습니다. MQ는 팀 기술 스택이 이미 가지고 있는 경우입니다. MQ라면 괜찮습니다. 지연 대기열을 위한 MQ 배포 비용은 약간 높습니다

  • Redis의 zset 및 list 기능을 사용하면 redis를 사용하여 지연 대기열을 구현할 수 있습니다RedisDelayQueue

2. 설계 목표

  • 실시간: 일정 기간 동안 2차 오류 허용
  • 고가용성: 독립 실행형 지원, 클러스터 지원
  • 메시지 삭제 지원: 비즈니스 지정된 메시지는 언제든지 삭제됩니다
  • 메시지 신뢰성: 최소 한 번 이상 소비 보장
  • 메시지 지속성: Redis 자체의 지속성 특성에 따라 Redis 데이터가 손실되면 지연된 메시지가 손실된다는 의미이지만, 기본 백업 및 클러스터 보장으로 사용됩니다. MangoDB에 메시지를 유지하기 위한 후속 최적화를 고려할 수 있습니다

3. 설계 계획

설계에는 주로 다음 사항이 포함됩니다.

  • 전체 Redis를 메시지 풀로 사용하고 메시지를 KV 형식으로 저장합니다.
  • ZSET는 우선순위 대기열 역할을 하며 점수에 따라 우선순위를 유지합니다.
  • LIST 구조를 사용하여 선입선출 방식으로 소비합니다.
  • ZSET 및 LIST는 메시지 주소(메시지 풀의 각 KEY에 해당)를 저장합니다.
  • 라우팅 객체를 사용자 정의하여 ZSET 및 LIST 이름을 저장하고 메시지를 ZSET에서 올바른 LIST로 지점 간 방식으로 라우팅합니다.
  • 타이머를 사용하여 라우팅을 유지합니다
  • TTL 규칙에 따라 메시지 지연을 구현합니다

3.1 디자인 다이어그램

여전히 좋아하는 지연 대기열을 기반으로 코드를 디자인, 최적화 및 구현합니다. Youzan Design
Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개

3.2 데이터 구조

  • ZING:DELAY_QUEUE:JOB_POOL은 모든 지연 대기열 정보를 저장하는 Hash_Table 구조입니다. KV 구조: K=prefix+projectName 필드 = topic+jobId V=CONENT;V클라이언트에서 전달한 데이터는 소비 시 반환됩니다.ZING:DELAY_QUEUE:JOB_POOL 是一个Hash_Table结构,里面存储了所有延迟队列的信息。KV结构:K=prefix+projectName  field = topic+jobId  V=CONENT;V由客户端传入的数据,消费的时候回传
  • ZING:DELAY_QUEUE:BUCKET 延迟队列的有序集合ZSET,存放K=ID和需要的执行时间戳,根据时间戳排序
  • ZING:DELAY_QUEUE:QUEUE LIST结构,每个Topic一个LIST,list存放的都是当前需要被消费的JOB

Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개
Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개

3.3 任务的生命周期

  1. 新增一个JOB,会在ZING:DELAY_QUEUE:JOB_POOL中插入一条数据,记录了业务方消费方。ZING:DELAY_QUEUE:BUCKET也会插入一条记录,记录执行的时间戳
  2. 搬运线程会去ZING:DELAY_QUEUE:BUCKET中查找哪些执行时间戳的RunTimeMillis比现在的时间小,将这些记录全部删除;同时会解析出每个任务的Topic是什么,然后将这些任务PUSH到TOPIC对应的列表ZING:DELAY_QUEUE:QUEUE
  3. 每个TOPIC的LIST都会有一个监听线程去批量获取LIST中的待消费数据,获取到的数据全部扔给这个TOPIC的消费线程池
  4. 消费线程池执行会去ZING:DELAY_QUEUE:JOB_POOL
  5. ZING:DELAY_QUEUE:BUCKET 지연 대기열 순서 ZSET 설정, K 저장 =ID 및 필요한 실행 타임스탬프(타임스탬프

ZING:DELAY_QUEUE:QUEUE LIST 구조에 따라 정렬됨), 각 주제에는 LIST가 있으며 목록에는 현재 소비해야 하는 항목이 저장됩니다. JOB 사진은 참고용이며 기본적으로 전체 과정을 설명할 수 있습니다. 실행 프로세스, 사진은 기사 마지막 부분의 참조 블로그에서 가져왔습니다.사진은 참고용일 뿐이며, 기본적으로 전체 프로세스의 실행 과정을 설명할 수 있으며, 사진은 기사 끝 부분의 참조 블로그에서 가져왔습니다. Article

🎜🎜🎜3.3 작업 수명 주기🎜🎜🎜🎜새 JOB을 추가하면 ZING:DELAY_QUEUE:JOB_POOL에 데이터 조각이 삽입되어 비즈니스 측면과 소비자를 기록합니다. 옆. ZING:DELAY_QUEUE:BUCKET은 실행 타임스탬프를 기록하기 위해 레코드도 삽입합니다🎜🎜처리 스레드는 ZING:DELAY_QUEUE:BUCKET으로 이동하여 RunTimeMillis가 있는 실행 타임스탬프를 찾습니다. ratio 이제 시간이 부족하므로 이러한 레코드를 모두 동시에 삭제하면 각 작업의 주제가 무엇인지 분석한 다음 이러한 작업을 TOPIC ZING:DELAY_QUEUE:QUEUE에 해당하는 목록에 푸시합니다. >🎜🎜각 TOPIC의 LIST에는 LIST에서 일괄적으로 소비할 데이터를 얻기 위한 청취 스레드가 있으며, 획득한 모든 데이터는 이 TOPIC의 소비 스레드 풀로 전달됩니다.🎜🎜소비 스레드 풀 실행은 다음으로 이동합니다. ZING:DELAY_QUEUE:JOB_POOL데이터 구조를 찾아 콜백 구조에 반환하고 콜백 메서드를 실행합니다. 🎜🎜🎜🎜🎜3.4 디자인 포인트🎜🎜🎜

3.4.1 기본 개념

  • JOB: 비동기 처리가 필요한 작업은 지연 대기열의 기본 단위입니다.
  • 주제: 동일한 유형의 작업 집합 집합(큐)입니다. 소비자가 구독하려면

3.4.2 메시지 구조

각 JOB에는 다음 속성이 포함되어야 합니다

  • jobId: 작업의 고유 식별자. 지정된 직무 정보
  • 주제: 직무 유형을 검색하고 삭제하는 데 사용됩니다. 구체적인 업체명으로 이해하시면 됩니다
  • delay: 작업을 지연시켜야 하는 시간. 단위: 초. (서버에서 이를 절대 시간으로 변환합니다.)
  • body: 소비자가 특정 비즈니스 처리를 수행하기 위한 Job의 내용, json 형식으로 저장됨
  • retry: 실패한 재시도 횟수
  • url: 알림 URL

3.5 디자인 세부사항

3.5.1 빠르게 소비하는 방법ZING:DELAY_QUEUE:QUEUEZING:DELAY_QUEUE:QUEUE

最简单的实现方式就是使用定时器进行秒级扫描,为了保证消息执行的时效性,可以设置每1S请求Redis一次,判断队列中是否有待消费的JOB。但是这样会存在一个问题,如果queue中一直没有可消费的JOB,那频繁的扫描就失去了意义,也浪费了资源,幸好LIST中有一个BLPOP阻塞原语,如果list中有数据就会立马返回,如果没有数据就会一直阻塞在那里,直到有数据返回,可以设置阻塞的超时时间,超时会返回NULL;具体的实现方式及策略会在代码中进行具体的实现介绍

3.5.2 避免定时导致的消息重复搬运及消费

  • 使用Redis的分布式锁来控制消息的搬运,从而避免消息被重复搬运导致的问题
  • 使用分布式锁来保证定时器的执行频率

4. 核心代码实现

4.1 技术说明

技术栈:SpringBoot,Redisson,Redis,分布式锁,定时器

注意:本项目没有实现设计方案中的多Queue消费,只开启了一个QUEUE,这个待以后优化

4.2 核心实体

4.2.1 Job新增对象

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
     */
    private Long delay;

    /**
     * Job的内容,供消费者做具体的业务处理,以json格式存储
     */
    @NotBlank
    private String body;

    /**
     * 失败重试次数
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}

4.2.2 Job删除对象

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;
}

4.3 搬运线程

/**
 * 搬运线程
 *
 * @author 睁眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 启动定时开启搬运JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
            List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</string></string></object></object>

4.4 消费线程

@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消费线程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
    }

    /**
     * 开启TOPIC消费线程
     * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 获取ReadyQueue中待消费的数据
                RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 获取job元信息内容
                RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消费
                FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消费成功,删除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消费失败,则根据策略重新加入Bucket

                    // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失败次数
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}</object></boolean></string></string>

4.5 添加及删除JOB

/**
 * 提供给外部服务的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 将job添加到 JobPool中
            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 将job添加到 DelayBucket中
            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 删除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</object></string></object></string>

5. 待优化的内容

  1. 目前只有一个Queue队列存放消息,当需要消费的消息大量堆积后,会影响消息通知的时效。改进的办法是,开启多个Queue,进行消息路由,再开启多个消费线程进行消费,提供吞吐量
  2. 消息没有进行持久化,存在风险,后续会将消息持久化到MangoDB中

6. 源码

更多详细源码请在下面地址中获取

  • RedisDelayQueue实现 zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)
  • RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)
  • 项目应用

구현하는 가장 간단한 방법은 타이머를 사용하여 스캔하는 것입니다. 두 번째 수준에서는 메시지 실행의 적시성을 보장하기 위해 1S마다 Redis에 요청을 설정하여 대기열에서 사용할 JOB이 있는지 확인할 수 있습니다. 하지만 문제가 발생합니다. 대기열에 소비 가능한 JOB이 없으면 잦은 검색은 의미가 없으며 다행히 LIST에 BLPOP 차단 프리미티브가 있습니다. 데이터가 있으면 즉시 반환되고, 데이터가 없으면 데이터가 반환될 때까지 차단되며, 차단 시간 초과를 설정할 수 있으며, 특정 구현 방법 및 전략은 NULL을 반환합니다. 3.5.2 타이밍으로 인한 메시지의 반복 처리 및 소비 방지

  • Redis의 분산 잠금을 사용하여 메시지 처리를 제어함으로써 메시지 반복 처리로 인해 발생하는 문제를 방지합니다
타이머 실행 빈도를 보장하려면 분산 잠금을 사용하세요

🎜4. 핵심 코드 구현🎜🎜🎜🎜🎜🎜4.1 기술 설명🎜🎜🎜🎜기술 스택: SpringBoot, Redisson, Redis, 분산 잠금, 타이머🎜🎜🎜참고🎜: 이 프로젝트는 설계 계획에서 다중 Queue 소비를 구현하지 않았으며 하나의 QUEUE만 열었습니다. 이는 향후 최적화될 예정입니다🎜🎜🎜🎜4.2 핵심 엔터티🎜🎜🎜🎜🎜🎜4.2.1 작업 새 객체🎜🎜🎜 rrreee🎜🎜🎜4.2.2 작업 삭제 개체🎜🎜🎜rrreee🎜🎜🎜4.3 처리 스레드🎜🎜🎜rrreee🎜🎜🎜4.4 소비 스레드🎜🎜🎜rrreee🎜🎜🎜4.5 추가 및 작업 삭제🎜🎜🎜rrreee 🎜🎜🎜5 . 최적화할 콘텐츠🎜 🎜🎜
    🎜현재 메시지를 저장하는 대기열은 하나뿐입니다. 소비해야 할 메시지가 많이 쌓이면 메시지 알림의 적시성이 영향을 받습니다. 개선 방법은 여러 대기열을 열고 메시지 라우팅을 수행한 다음 처리량을 제공하기 위해 여러 소비자 스레드를 여는 것입니다. 이는 메시지가 향후 MangoDB에 유지될 위험이 있습니다. >🎜🎜🎜6. 소스코드🎜🎜🎜🎜자세한 소스코드는 다음 주소🎜🎜🎜RedisDelayQueue 구현 zing-delay-queue (https://gitee.com/whyCodeData/)에서 확인하세요. zing -project/tree/master/zing-delay-queue)🎜🎜RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/) zing -starter/redisson-spring-boot-starter)🎜🎜프로젝트 애플리케이션 zing-pay (https://gitee.com/whyCodeData/zing-pay)🎜🎜🎜🎜🎜7. 🎜 🎜🎜🎜https://tech.youzan.com/queuing_delay/🎜🎜https://blog.csdn.net/u010634066/article/details/98864764🎜🎜🎜더 많은 Redis 지식을 보려면 다음 항목에 주의하세요. 🎜redis 입문 튜토리얼🎜 칼럼. 🎜

위 내용은 Redis는 지연 대기열을 어떻게 구현합니까? 방법 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 csdn.net에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제