Home >Java >javaTutorial >How SpringBoot integrates Redisson to implement delay queue
1. The order was successfully placed but not paid for 30 minutes. Payment timeout,Automatic cancellation of order
2. Order receipt,No evaluation will be conducted within 7 days after receipt. The order timed out and was not evaluated,The system defaults to a positive review
3. The order was placed successfully,The merchant did not receive the order within 5 minutes,The order was cancelled
4.The delivery timed out xff0c;Push SMS reminder
……
For scenarios with long delays and low real-time performance, we can use task scheduling to process polling at regular intervals. For example,:xxl-job
Today we adopt a relatively simple and lightweight method-using Redis' delay queue for processing. Of course there are better solutions - you can choose the optimal solution based on the company's technology selection and business system. For example, use the message middleware Kafka and RabbitMQ's delay queue
Let's not discuss its implementation principle. Let's go directly to the actual code and first implement the Redis-based delay queue
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.10.5</version> </dependency>
spring: redis: host: 127.0.0.1 port: 6379 password: 123456 database: 12 timeout: 3000
/** * Created by LPB on 2020/04/20. */ @Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.database}") private int database; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setDatabase(database) .setPassword(password); return Redisson.create(config); } }
/** * redis延迟队列工具 * Created by LPB on 2021/04/20. */ @Slf4j @Component public class RedisDelayQueueUtil { @Autowired private RedissonClient redissonClient; /** * 添加延迟队列 * @param value 队列值 * @param delay 延迟时间 * @param timeUnit 时间单位 * @param queueCode 队列键 * @param <T> */ public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){ try { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(value, delay, timeUnit); log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒"); } catch (Exception e) { log.error("(添加延时队列失败) {}", e.getMessage()); throw new RuntimeException("(添加延时队列失败)"); } } /** * 获取延迟队列 * @param queueCode * @param <T> * @return * @throws InterruptedException */ public <T> T getDelayQueue(String queueCode) throws InterruptedException { RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode); T value = (T) blockingDeque.take(); return value; } }
/** * 延迟队列业务枚举 * Created by LPB on 2021/04/20. */ @Getter @NoArgsConstructor @AllArgsConstructor public enum RedisDelayQueueEnum { ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderPaymentTimeout"), ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated"); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private String beanId; }
/** * 延迟队列执行器 * Created by LPB on 2021/04/20. */ public interface RedisDelayQueueHandle<T> { void execute(T t); }
OrderPaymentTimeout:Order payment timeout delay queue processing class
/** * 订单支付超时处理类 * Created by LPB on 2021/04/20. */ @Component @Slf4j public class OrderPaymentTimeout implements RedisDelayQueueHandle<Map> { @Override public void execute(Map map) { log.info("(收到订单支付超时延迟消息) {}", map); // TODO 订单支付超时,自动取消订单处理业务... } }
DelayQueueProcessorForUnevaluatedOrders: Delay queue processing class for processing unevaluated orders , used when the order times out and is not evaluated
/** * 订单超时未评价处理类 * Created by LPB on 2021/04/20. */ @Component @Slf4j public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle<Map> { @Override public void execute(Map map) { log.info("(收到订单超时未评价延迟消息) {}", map); // TODO 订单超时未评价,系统默认好评处理业务... } }
/** * 启动延迟队列 * Created by LPB on 2021/04/20. */ @Slf4j @Component public class RedisDelayQueueRunner implements CommandLineRunner { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @Override public void run(String... args) { new Thread(() -> { while (true){ try { RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values(); for (RedisDelayQueueEnum queueEnum : queueEnums) { Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode()); if (value != null) { RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId()); redisDelayQueueHandle.execute(value); } } } catch (InterruptedException e) { log.error("(Redis延迟队列异常中断) {}", e.getMessage()); } } }).start(); log.info("(Redis延迟队列启动成功)"); } }
The above steps ,Redis delay queue core code has been completed,Let’s write a test interface,Use PostMan to simulate and test it
/** * 延迟队列测试 * Created by LPB on 2020/04/20. */ @RestController public class RedisDelayQueueController { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @PostMapping("/addQueue") public void addQueue() { Map<String, String> map1 = new HashMap<>(); map1.put("orderId", "100"); map1.put("remark", "订单支付超时,自动取消订单"); Map<String, String> map2 = new HashMap<>(); map2.put("orderId", "200"); map2.put("remark", "订单超时未评价,系统默认好评"); // 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟 redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode()); // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟 redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode()); } }
You can see through the Redis client that the two delay queues have been Added successfully
Check the IDEA console log to see that the delay queue has been consumed successfully
The above is the detailed content of How SpringBoot integrates Redisson to implement delay queue. For more information, please follow other related articles on the PHP Chinese website!