Verzögerungswarteschlange ist, wie der Name schon sagt, eine Nachrichtenwarteschlange mit Verzögerungsfunktion. Unter welchen Umständen benötige ich eine solche Warteschlange?
1. Hintergrund
Sehen wir uns zunächst das folgende Geschäftsszenario an:
1.1 Lösung
Am einfachsten ist es, den Zähler regelmäßig zu scannen . Wenn beispielsweise die Anforderungen an den Zahlungsablauf bei Bestellungen relativ hoch sind, wird der Zähler alle 2 Sekunden gescannt, um abgelaufene Bestellungen zu überprüfen und die Bestellungen aktiv abzuschließen. Der Vorteil ist, dass es einfach ist, Der Nachteil ist, dass die Tabelle jede Minute global gescannt wird, was Ressourcen verschwendetWenn das Auftragsvolumen der Tabellendaten bald abläuft, ist es groß , wird es zu einer Verzögerung beim Auftragsabschluss kommen.
Verwenden Sie RabbitMq oder andere MQ-Modifikationen, um Verzögerungswarteschlangen zu implementieren. Die Vorteile sind, dass es Open Source und eine vorgefertigte und stabile Implementierungslösung ist . Wenn der Team-Technologie-Stack von Natur aus über MQ verfügt, ist das in Ordnung. Wenn nicht, ist es etwas teuer, einen Satz MQ bereitzustellen, um die Warteschlange zu verzögern Listenfunktionen können wir mit Redis implementieren. Eine Verzögerungswarteschlange
2. Designziele
Echtzeitleistung: Fehler der zweiten Ebene sind für einen bestimmten Zeitraum zulässigHohe Verfügbarkeit: Unterstützt einzelne Maschinen, unterstützt Cluster
Das Design umfasst hauptsächlich die folgenden Punkte : Verwenden Sie das gesamte Redis als Nachrichtenpool und speichern Sie Nachrichten im KV-Format.
Verwenden Sie ZSET als Prioritätswarteschlange und behalten Sie die Priorität gemäß Score bei.
Es basiert immer noch auf Youzans Verzögerungswarteschlangendesign, Optimierung und Codeimplementierung. Youzan Design
3.2 Datenstruktur
ist eine Hash_Table-Struktur, die Informationen über speichert alle Verzögerungswarteschlangen. KV-Struktur: K=Präfix+Projektname-Feld = Thema+Job-ID V=CONENT;VDie vom Client übergebenen Daten werden bei Verbrauch zurückgegeben
ZING:DELAY_QUEUE:JOB_POOL
ZING:DELAY_QUEUE:BUCKET
ZING:DELAY_QUEUE:QUEUE
Das Bild dient nur als Referenz und kann grundsätzlich die Ausführung des gesamten Prozesses beschreiben. Das Bild stammt aus dem Referenzblog am Ende des Artikels3.3 Aufgabenleben Zyklus
Wenn Sie einen neuen JOB hinzufügen, wird ein Datenelement in eingefügt, um die Geschäftsseite und die Verbraucherseite aufzuzeichnen. fügt außerdem einen Datensatz ein, um den Ausführungszeitstempel aufzuzeichnen.
ZING:DELAY_QUEUE:JOB_POOL
ZING:DELAY_QUEUE:BUCKET
geworfen. Die Ausführung des Consumer-Thread-Pools geht zu ZING:DELAY_QUEUE:BUCKET
ZING:DELAY_QUEUE:QUEUE
ZING:DELAY_QUEUE:JOB_POOL
3.4.1 Grundkonzepte
3.4.2 Nachrichtenstruktur
Jeder JOB muss die folgenden Attribute enthalten
3.5 Designdetails
3.5.1 Wie schnell verbrauchen ZING:DELAY_QUEUE:QUEUE
Die einfachste Möglichkeit, dies zu implementieren, ist die Verwendung eines Timers für das Scannen der zweiten Ebene, um die Aktualität der Nachrichtenausführung sicherzustellen Sie können alle 1S eine Anfrage für Redis stellen und feststellen, ob sich JOBs in der Warteschlange befinden. Wenn sich jedoch keine Verbrauchsmaterialien in der Warteschlange befinden, ist häufiges Scannen sinnlos und eine Verschwendung von Ressourcen. Wenn die Liste Daten enthält, ist dies jedoch der Fall Wenn keine Daten vorhanden sind, werden sie dort blockiert, bis die Daten zurückgegeben werden. Nach Ablauf des Zeitlimits werden die spezifischen Implementierungsmethoden und -strategien zurückgegeben >BLPOP阻塞原语
3.5.2 Vermeiden Sie die wiederholte Nachrichtenverarbeitung und den zeitbedingten Verbrauch
Verwenden Sie die verteilte Sperre von Redis, um die Nachrichtenverarbeitung zu steuern und so zu vermeiden Wiederholte Verarbeitung von Nachrichten.Kerncode-Implementierung
4.1 Technische BeschreibungTechnologie-Stack: SpringBoot, Redisson, Redis, verteilte Sperre, Timer
Hinweis: Dieses Projekt hat den mehrfachen Warteschlangenverbrauch im Entwurfsplan nicht erkannt und nur eine Warteschlange geöffnet. Dies wird in Zukunft optimiert
4.2 Kernentität
4.2.1 Job fügt Objekte hinzu/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
/**
* Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
*/
private Long delay;
/**
* Job的内容,供消费者做具体的业务处理,以json格式存储
*/
@NotBlank
private String body;
/**
* 失败重试次数
*/
private int retry = 0;
/**
* 通知URL
*/
@NotBlank
private String url;
}
4.2.2 Job löscht Objekte /**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class JobDie implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
}
4.3 Handling-Thread/**
* 搬运线程
*
* @author 睁眼看世界
* @date 2020年1月17日
*/
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* 启动定时开启搬运JOB信息
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
readyQueue.addAll(jobList);
bucketSet.removeAllAsync(jobList);
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</string></string></object></object>
4.4 Verbrauchs-Thread@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC消费线程
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
/**
* 开启TOPIC消费线程
* 将所有可能出现的异常全部catch住,确保While(true)能够不中断
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. 获取ReadyQueue中待消费的数据
RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. 获取job元信息内容
RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. 消费
FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 消费成功,删除JobPool和DelayBucket的job信息
jobPoolMap.remove(topicId);
} else {
int retrySum = job.getRetry() + 1;
// 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 更新元信息失败次数
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
}</object></boolean></string></string>
4.5 JOB hinzufügen und löschen/**
* 提供给外部服务的操作接口
*
* @author why
* @date 2020年1月15日
*/
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 添加job元信息
*
* @param job 元信息
*/
@Override
public void addJob(Job job) {
RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
}
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 删除job信息
*
* @param job 元信息
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</object></string></object></string>
5. Zu optimierender Inhalt
Derzeit gibt es nur eine Warteschlange. In der Warteschlange werden Nachrichten gespeichert. Wenn sich eine große Anzahl von Nachrichten ansammelt, die verarbeitet werden müssen, wird die Aktualität der Nachrichtenbenachrichtigungen beeinträchtigt. Die Verbesserungsmethode besteht darin, mehrere Warteschlangen zu öffnen, eine Nachrichtenweiterleitung durchzuführen und dann mehrere Verbraucherthreads für den Verbrauch zu öffnen, um Durchsatz bereitzustellen.QuellcodeDetailliertere Quellcodes erhalten Sie unter der untenstehenden Adresse
RedisDelayQueue实现
RedissonStarter
项目应用
7. Referenz
https://tech.youzan.com /queuing_delay /Das obige ist der detaillierte Inhalt vonWie implementiert Redis die Verzögerungswarteschlange? Methodeneinführung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!