ホームページ  >  記事  >  データベース  >  Redis は遅延キューをどのように実装しますか?手法の紹介

Redis は遅延キューをどのように実装しますか?手法の紹介

青灯夜游
青灯夜游転載
2020-07-08 16:01:583067ブラウズ

Redis は遅延キューをどのように実装しますか?手法の紹介

遅延キューは、その名のとおり、遅延機能を備えたメッセージキューです。では、どのような状況でそのようなキューが必要になるのでしょうか?

1. 背景

まず、次のビジネス シナリオを見てみましょう:

  • 注文が完了したとき未払い 注文が返金ステータスにあるときに注文をタイムリーにクローズする方法
  • 返金ステータスにある注文が正常に返金されたかどうかを定期的に確認する方法
  • 注文が返金されない場合下流システムからステータス通知を長期間にわたって受信する方法 注文ステータスの段階的同期を実現する戦略
  • システムが上流システムに支払い成功の最終ステータスを通知すると、上流システムは通知失敗を返します。非同期通知を実行し、分割した頻度で送信する方法: 15 秒 3 分 10 分 30 分 30 分 1 時間 2 時間 6 時間 15 時間

1.1 解決策

  • ##最も簡単な方法は、メーターを定期的にスキャンすることです。たとえば、注文の支払い有効期限要件が比較的高い場合、メーターは 2 秒ごとにスキャンされ、期限切れの注文を確認し、注文を積極的にクローズします。 利点はシンプルであることです欠点はテーブルを毎分グローバルにスキャンするため、リソースが無駄になることです有効期限が切れそうなテーブルデータの注文量が多い場合、注文完了に遅れが生じます。

  • RabbitMq またはその他の MQ 変更を使用して遅延キューを実装します。利点は、オープン ソースであり、既製の安定した実装ソリューションであることです。欠点は次のとおりです。MQ はメッセージ ミドルウェアです。チームのテクノロジー スタックが本質的に MQ を持っている場合は問題ありません。そうでない場合は、キューを遅延させるために MQ のセットをデプロイするのは少し費用がかかります。

  • Redis の zset を使用する機能をリストする場合は、Redis を使用して実装できます遅延キューRedisDelayQueue

2. 設計目標

  • リアルタイム パフォーマンス: 一定期間、第 2 レベルのエラーが許可されます。
  • 高可用性: スタンドアロンをサポート、クラスターをサポートします。
  • メッセージ削除をサポートします。 : 企業はいつでも指定されたメッセージを削除します
  • メッセージの信頼性: 少なくとも 1 回は消費されることが保証されます
  • メッセージの永続性: Redis 自体の永続特性に基づいて、Redis データが失われた場合、これは遅延メッセージが失われることを意味しますが、プライマリ バックアップとクラスタの保証は提供できます。これは、メッセージを MangoDB に永続化するための後続の最適化のために考慮できます。

3. 設計計画

設計には主に次の内容が含まれます。ポイント:
  • Redis 全体をメッセージ プールとして扱い、メッセージを KV 形式で保存します
  • ZSET を優先キューとして使用し、スコアに従って優先度を維持します
  • LIST 構造を使用して先出し消費を促進します。
  • ZSET および LIST はメッセージ アドレスを格納します (メッセージ プール内の各 KEY に対応します)
  • ルーティング オブジェクトをカスタマイズし、ZSET および LIST 名を格納し、 ZSET ルートから正しいリストにメッセージを送信します。
  • タイマーを使用してルーティングを維持します。
  • TTL ルールに従ってメッセージ遅延を実装します。

3.1 設計図


これは依然としてYouzanの遅延キュー設計、最適化、コード実装に基づいています。 Youzan DesignRedis は遅延キューをどのように実装しますか?手法の紹介

##3.2 データ構造

  • ZING:DELAY_QUEUE:JOB_POOL です。すべての遅延キュー情報を格納する Hash_Table 構造。 KV 構造: K=prefix projectName field = topic jobId V=CONENT;Vクライアントから渡されたデータは、消費時に返されます
  • ZING:DELAY_QUEUE:BUCKET 遅延キューがあります シーケンスset ZSET は、タイムスタンプに従ってソートされた K=ID と必要な実行タイムスタンプを格納します。
  • ZING:DELAY_QUEUE:QUEUE LIST 構造。各トピックには LIST があり、リストには JOB# が格納されます。

Redis は遅延キューをどのように実装しますか?手法の紹介## 現在使用する必要がある画像は参考用です。基本的にプロセス全体の実行を説明できます。画像は記事の最後にある参考ブログから引用しています。

3.3 タスクのライフ サイクル

    新しい JOB が追加されると、データの一部が
  1. ZING に挿入されます。 :DELAY_QUEUE:JOB_POOL とビジネス側と消費者側を記録しました。 ZING:DELAY_QUEUE:BUCKET は、実行タイムスタンプを記録するレコードも挿入します。
  2. 処理スレッドは
  3. ZING:DELAY_QUEUE:BUCKET に移動して、どの実行タイムスタンプが RunTimeMillis であるかを検索します。現在時刻より小さい場合は、これらのレコードをすべて削除します。同時に、各タスクのトピックが何であるかを解析し、これらのタスクを TOPICZING:DELAY_QUEUE:QUEUE に対応するリストにプッシュします。
  4. 各 TOPIC LIST には、LIST 内で消費されるデータをバッチ取得するためのリスニング スレッドがあり、取得されたすべてのデータは、この TOPIC の消費スレッド プールにスローされます。
  5. 消費スレッド プールの実行が始まります
  6. ZING:DELAY_QUEUE:JOB_POOLデータ構造を見つけて、それをコールバック構造に返し、コールバック メソッドを実行します。

3.4 設計ポイント

3.4.1 基本概念

  • JOB: 非同期処理を必要とするタスクは遅延キューの基本単位です
  • トピック: 同じタイプのジョブのコレクション (キュー)。コンシューマがサブスクライブするには

#3.4.2 メッセージ構造

#各 JOB には次の属性が含まれている必要があります

    jobId: ジョブの一意の識別子。指定したジョブ情報を取得および削除するために使用されます。
  • トピック: ジョブの種類。これは、特定のビジネス名として理解できます。
  • lay: ジョブを遅らせる必要がある時間。単位: 秒。 (サーバーが絶対時間に変換します)
  • body: コンシューマーが特定の業務処理を実行するためのジョブの内容。json 形式で保存されます。
  • retry: 失敗した再試行の数
  • url: 通知 URL

3.5 設計の詳細

3.5.1 方法ZING:DELAY_QUEUE:QUEUE

最も簡単な実装方法は、タイマーを使用して第 2 レベルのスキャンを実行することです。メッセージ実行の適時性を確保するには、1 秒ごとに Redis をリクエストして、キュー内に消費されるジョブがあるかどうかを判断するように設定できます。しかし、問題が発生します。キューに消費可能な JOB がない場合、頻繁なスキャンは無意味であり、リソースの無駄になります。幸いなことに、LIST には

BLPOP ブロッキング プリミティブ があります。リストの場合、データがあればすぐに返します データがなければデータが返されるまでそこでブロックされます ブロックタイムアウトを設定でき、タイムアウト後に NULL が返されます 具体的な実装方法と戦略は、

3.5.2 タイミングによるメッセージの繰り返し転送と消費を避ける

##Redis の分散ロックを使用するメッセージの転送を制御します。メッセージの繰り返し転送による問題を回避するには、
  • 分散ロックを使用してタイマーの実行頻度を確保します。

4. コア コードの実装

4.1 技術的な説明#技術スタック: SpringBoot、Redisson、Redis、分散ロック、 timer

Note

: このプロジェクトは、設計計画では複数のキューの消費を認識しておらず、1 つのキューのみを開きます。これは将来最適化される予定です。

4.2 コア エンティティ

4.2.1 ジョブへの新しいオブジェクトの追加##

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
     */
    private Long delay;

    /**
     * Job的内容,供消费者做具体的业务处理,以json格式存储
     */
    @NotBlank
    private String body;

    /**
     * 失败重试次数
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}
#4.2.2 ジョブからのオブジェクトの削除

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;
}
4.3 トランスポート スレッド

/**
 * 搬运线程
 *
 * @author 睁眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 启动定时开启搬运JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
            List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</string></string></object></object>
4.4 コンシ​​ューマ スレッド

@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消费线程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
    }

    /**
     * 开启TOPIC消费线程
     * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 获取ReadyQueue中待消费的数据
                RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 获取job元信息内容
                RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消费
                FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消费成功,删除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消费失败,则根据策略重新加入Bucket

                    // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失败次数
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}</object></boolean></string></string>
4.5 JOBの追加と削除

/**
 * 提供给外部服务的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 将job添加到 JobPool中
            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 将job添加到 DelayBucket中
            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 删除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</object></string></object></string>
5. コンテンツの追加最適化する必要があります

現在、メッセージを保存する Queue キューは 1 つだけです。消費する必要があるメッセージが大量に蓄積すると、メッセージ通知の適時性が影響を受けます。改善方法は、複数のキューを開き、メッセージ ルーティングを実行し、スループットを提供するために複数のコンシューマ スレッドを開いて消費することです。

メッセージは永続化されないため、危険です。メッセージは MangoDB に永続化されます。
  1. 6. ソースコード

詳細なソースコードは以下のアドレスから入手してください

RedisDelayQueue の実装

zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)
  • RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)
  • プロジェクト申請 zing-pay(https://gitee.com/whyCodeData/zing-pay)
  • ##7. 参考文献

##https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764

    詳細redis に関する知識については、
  • redis 入門チュートリアル
  • 列を参照してください。

以上がRedis は遅延キューをどのように実装しますか?手法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。