Use
Dependency configuration
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.homeey</groupId> <artifactId>redis-delay-queue</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redis-delay-queue</name> <description>redis-delay-queue</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.19.3</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-23</artifactId> <version>3.19.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
Note: Handle redisson and springboot compatibility issues
Configuration file
There are three ways for springboot to integrate redisson
The first: general redis configuration redisson automatic configuration [the simplest]
The second type: Use a separate redisson configuration file
The third type: Use spring.redis.redisson to configure under the configuration key
For detailed integration, view springboot integrated redisson configuration
spring: redis: database: 0 host: localhost port: 6379 timeout: 10000 lettuce: pool: max-active: 8 max-wait: -1 min-idle: 0 max-idle: 8
demo code
package com.homeey.redisdelayqueue.delay; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 明天的你会因今天到的努力而幸运 * * @author jt4mrg@qq.com * 23:11 2023-02-19 2023 **/ @Slf4j @Component @RequiredArgsConstructor public class RedissonDelayQueue { private final RDelayedQueue<String> delayedQueue; private final RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.submit(() -> { while (true) { try { String task = blockingQueue.take(); log.info("rev delay task:{}", task); } catch (Exception e) { log.error("occur error", e); } } }); } public void offerTask(String task, long seconds) { log.info("add delay task:{},delay time:{}s", task, seconds); delayedQueue.offer(task, seconds, TimeUnit.SECONDS); } @Configuration static class RedissonDelayQueueConfigure { @Bean public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) { return redissonClient.getBlockingQueue("TOKEN-RENEWAL"); } @Bean public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue, RedissonClient redissonClient) { return redissonClient.getDelayedQueue(blockingQueue); } } }
execution effect
principle analysis
From RedissonDelayedQueue
In the implementation we see four roles
##redisson_delay_queue_timeout:xxx, sorted set data Type, stores all delayed tasks, sorted according to the expiration timestamp of the delayed tasks (the timestamp delay time when the task is submitted), so the first element at the front of the list is the earliest task to be executed in the entire delayed queue , this concept is very important
redisson_delay_queue:xxx, list data type, I have not found any use for the time being, but it will be written here when submitting the task, queue transfer The elements inside will be deleted
xxx: list data type, which is called the target queue. The tasks stored in this queue have all reached the delay time. Tasks that can be obtained by consumers, so the take method of RBlockingQueue in the demo above obtains the task
redisson_delay_queue_channel:xxx from this target queue, which is A channel used to notify the client to start a delayed task
RedissonDelayedQueueWhen the delayed queue is created, the queue transfer service is specified. As well as the four important color correction keys for implementing the delay queue. The core code is to specify the queue transfer task
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "//拿到zset中过期的值列表 + "if #expiredValues > 0 then " //如果有 + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);"//解构消息,在提交任务时打包的消息 + "redis.call('rpush', KEYS[1], value);" //放入无前缀的list 队头 + "redis.call('lrem', KEYS[3], 1, v);"//移除带前缀list 队尾元素 + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素 + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "//取zset最小分值的元素 + "if v[1] ~= nil then " + "return v[2]; " //返回分值,即过期时间 + "end " + "return nil;", Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } };Producer
RedissonDelayedQueue#offerAsync
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息体:消息id,消息长度,消息值 + "redis.call('zadd', KEYS[2], ARGV[1], value);"//zset中加入消息及其超时分值 + "redis.call('rpush', KEYS[3], value);" //向带前缀的list中添加消息 // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "//取出zset中第一个元素 + "if v[1] == value then " //如果最快过期的元素就是这次发送的消息 + "redis.call('publish', KEYS[4], ARGV[1]); " //channel中发布一下超时时间 + "end;", Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));Consumer The easiest way for consumers is to simply read BLPOP from the list without prefix
The above is the detailed content of How Redis implements delay queue. For more information, please follow other related articles on the PHP Chinese website!

Redis goes beyond SQL databases because of its high performance and flexibility. 1) Redis achieves extremely fast read and write speed through memory storage. 2) It supports a variety of data structures, such as lists and collections, suitable for complex data processing. 3) Single-threaded model simplifies development, but high concurrency may become a bottleneck.

Redis is superior to traditional databases in high concurrency and low latency scenarios, but is not suitable for complex queries and transaction processing. 1.Redis uses memory storage, fast read and write speed, suitable for high concurrency and low latency requirements. 2. Traditional databases are based on disk, support complex queries and transaction processing, and have strong data consistency and persistence. 3. Redis is suitable as a supplement or substitute for traditional databases, but it needs to be selected according to specific business needs.

Redisisahigh-performancein-memorydatastructurestorethatexcelsinspeedandversatility.1)Itsupportsvariousdatastructureslikestrings,lists,andsets.2)Redisisanin-memorydatabasewithpersistenceoptions,ensuringfastperformanceanddatasafety.3)Itoffersatomicoper

Redis is primarily a database, but it is more than just a database. 1. As a database, Redis supports persistence and is suitable for high-performance needs. 2. As a cache, Redis improves application response speed. 3. As a message broker, Redis supports publish-subscribe mode, suitable for real-time communication.

Redisisamultifacetedtoolthatservesasadatabase,server,andmore.Itfunctionsasanin-memorydatastructurestore,supportsvariousdatastructures,andcanbeusedasacache,messagebroker,sessionstorage,andfordistributedlocking.

Redisisanopen-source,in-memorydatastructurestoreusedasadatabase,cache,andmessagebroker,excellinginspeedandversatility.Itiswidelyusedforcaching,real-timeanalytics,sessionmanagement,andleaderboardsduetoitssupportforvariousdatastructuresandfastdataacces

Redis is an open source memory data structure storage used as a database, cache and message broker, suitable for scenarios where fast response and high concurrency are required. 1.Redis uses memory to store data and provides microsecond read and write speed. 2. It supports a variety of data structures, such as strings, lists, collections, etc. 3. Redis realizes data persistence through RDB and AOF mechanisms. 4. Use single-threaded model and multiplexing technology to handle requests efficiently. 5. Performance optimization strategies include LRU algorithm and cluster mode.

Redis's functions mainly include cache, session management and other functions: 1) The cache function stores data through memory to improve reading speed, and is suitable for high-frequency access scenarios such as e-commerce websites; 2) The session management function shares session data in a distributed system and automatically cleans it through an expiration time mechanism; 3) Other functions such as publish-subscribe mode, distributed locks and counters, suitable for real-time message push and multi-threaded systems and other scenarios.


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

SublimeText3 English version
Recommended: Win version, supports code prompts!

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool

Dreamweaver Mac version
Visual web development tools

Safe Exam Browser
Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.
