遅延キューは、その名のとおり、遅延機能を備えたメッセージキューです。では、どのような状況でそのようなキューが必要になるのでしょうか?
1. 背景
まず、次のビジネス シナリオを見てみましょう:
1.1 解決策
##最も簡単な方法は、メーターを定期的にスキャンすることです。たとえば、注文の支払い有効期限要件が比較的高い場合、メーターは 2 秒ごとにスキャンされ、期限切れの注文を確認し、注文を積極的にクローズします。 利点はシンプルであることです、欠点はテーブルを毎分グローバルにスキャンするため、リソースが無駄になることです有効期限が切れそうなテーブルデータの注文量が多い場合、注文完了に遅れが生じます。
RabbitMq またはその他の MQ 変更を使用して遅延キューを実装します。利点は、オープン ソースであり、既製の安定した実装ソリューションであることです。欠点は次のとおりです。MQ はメッセージ ミドルウェアです。チームのテクノロジー スタックが本質的に MQ を持っている場合は問題ありません。そうでない場合は、キューを遅延させるために MQ のセットをデプロイするのは少し費用がかかります。
Redis の zset を使用する機能をリストする場合は、Redis を使用して実装できます遅延キューRedisDelayQueue
2. 設計目標
3. 設計計画
設計には主に次の内容が含まれます。ポイント:3.1 設計図
これは依然としてYouzanの遅延キュー設計、最適化、コード実装に基づいています。 Youzan Design
##3.2 データ構造
ZING:DELAY_QUEUE:JOB_POOL
です。すべての遅延キュー情報を格納する Hash_Table 構造。 KV 構造: K=prefix projectName field = topic jobId V=CONENT;Vクライアントから渡されたデータは、消費時に返されます ZING:DELAY_QUEUE:BUCKET
遅延キューがあります シーケンスset ZSET は、タイムスタンプに従ってソートされた K=ID と必要な実行タイムスタンプを格納します。 ZING:DELAY_QUEUE:QUEUE
LIST 構造。各トピックには LIST があり、リストには JOB# が格納されます。 ## 現在使用する必要がある画像は参考用です。基本的にプロセス全体の実行を説明できます。画像は記事の最後にある参考ブログから引用しています。
3.3 タスクのライフ サイクル
とビジネス側と消費者側を記録しました。
ZING:DELAY_QUEUE:BUCKET は、実行タイムスタンプを記録するレコードも挿入します。
に移動して、どの実行タイムスタンプが RunTimeMillis であるかを検索します。現在時刻より小さい場合は、これらのレコードをすべて削除します。同時に、各タスクのトピックが何であるかを解析し、これらのタスクを TOPIC
ZING:DELAY_QUEUE:QUEUE に対応するリストにプッシュします。
データ構造を見つけて、それをコールバック構造に返し、コールバック メソッドを実行します。
3.4 設計ポイント
3.4.1 基本概念
#3.4.2 メッセージ構造
#各 JOB には次の属性が含まれている必要があります3.5 設計の詳細
3.5.1 方法ZING:DELAY_QUEUE:QUEUE
BLPOP ブロッキング プリミティブ があります。リストの場合、データがあればすぐに返します データがなければデータが返されるまでそこでブロックされます ブロックタイムアウトを設定でき、タイムアウト後に NULL が返されます 具体的な実装方法と戦略は、
3.5.2 タイミングによるメッセージの繰り返し転送と消費を避ける
##Redis の分散ロックを使用するメッセージの転送を制御します。メッセージの繰り返し転送による問題を回避するには、4. コア コードの実装
4.1 技術的な説明#技術スタック: SpringBoot、Redisson、Redis、分散ロック、 timer
Note
: このプロジェクトは、設計計画では複数のキューの消費を認識しておらず、1 つのキューのみを開きます。これは将来最適化される予定です。
4.2 コア エンティティ
4.2.1 ジョブへの新しいオブジェクトの追加##/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
/**
* Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
*/
private Long delay;
/**
* Job的内容,供消费者做具体的业务处理,以json格式存储
*/
@NotBlank
private String body;
/**
* 失败重试次数
*/
private int retry = 0;
/**
* 通知URL
*/
@NotBlank
private String url;
}
#4.2.2 ジョブからのオブジェクトの削除
/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class JobDie implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
}
4.3 トランスポート スレッド
/**
* 搬运线程
*
* @author 睁眼看世界
* @date 2020年1月17日
*/
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* 启动定时开启搬运JOB信息
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
readyQueue.addAll(jobList);
bucketSet.removeAllAsync(jobList);
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</string></string></object></object>
4.4 コンシューマ スレッド
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC消费线程
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
/**
* 开启TOPIC消费线程
* 将所有可能出现的异常全部catch住,确保While(true)能够不中断
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. 获取ReadyQueue中待消费的数据
RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. 获取job元信息内容
RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. 消费
FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 消费成功,删除JobPool和DelayBucket的job信息
jobPoolMap.remove(topicId);
} else {
int retrySum = job.getRetry() + 1;
// 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 更新元信息失败次数
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
}</object></boolean></string></string>
4.5 JOBの追加と削除
/**
* 提供给外部服务的操作接口
*
* @author why
* @date 2020年1月15日
*/
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 添加job元信息
*
* @param job 元信息
*/
@Override
public void addJob(Job job) {
RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
}
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 删除job信息
*
* @param job 元信息
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</object></string></object></string>
5. コンテンツの追加最適化する必要があります
現在、メッセージを保存する Queue キューは 1 つだけです。消費する必要があるメッセージが大量に蓄積すると、メッセージ通知の適時性が影響を受けます。改善方法は、複数のキューを開き、メッセージ ルーティングを実行し、スループットを提供するために複数のコンシューマ スレッドを開いて消費することです。
メッセージは永続化されないため、危険です。メッセージは MangoDB に永続化されます。詳細なソースコードは以下のアドレスから入手してください
RedisDelayQueue の実装
zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)プロジェクト申請
zing-pay(https://gitee.com/whyCodeData/zing-pay)
##7. 参考文献##https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764
以上がRedis は遅延キューをどのように実装しますか?手法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。