DelayQueue is an implementation class of unbounded BlockingQueue, which is used to place objects that implement the Delayed interface. The objects can only be taken from the queue when they expire. Walk.
BlockingQueue is a blocking queue, a multi-thread-safe queue data structure provided by Java. When the number of elements in the queue is 0, the thread trying to obtain elements from the queue will be blocked or throw an exception.
The "unbounded" queue here means that there is no upper limit on the number of elements in the queue, and the capacity of the queue will expand as the number of elements increases.
DelayQueue implements the BlockingQueue interface, so it has the characteristics of unbounded and blocking. In addition, its own core characteristics are:
The delayed task object placed in the queue can only be retrieved after the delay time is reached.".
DelayQueue does not accept null elements
"DelayQueue only accepts objects that implement the java.util.concurrent.Delayed interface"
After understanding the characteristics of DelayQueue, we can use it to implement delayed tasks and implement the java.util.concurrent.Delayed
interface.
import org.jetbrains.annotations.NotNull; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 延时订单任务 */ public class OrderDelayObject implements Delayed { private String name; private long delayTime; //延时时间 //实际业务中这里传订单信息对象,我这里只做demo,所以使用字符串了 private String order; public OrderDelayObject(String name, long delayTime, String order) { this.name = name; //延时时间加上当前时间 this.delayTime = System.currentTimeMillis() + delayTime; this.order = order; } //获取延时任务的倒计时时间 @Override public long getDelay(TimeUnit unit) { long diff = delayTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } //延时任务队列,按照延时时间元素排序,实现Comparable接口 @Override public int compareTo(@NotNull Delayed obj) { return Long.compare(this.delayTime, ((OrderDelayObject) obj).delayTime); } @Override public String toString() { Date date = new Date(delayTime); SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return "\nOrderDelayObject:{" + "name=" + name + ", time=" + sd.format(date) + ", order=" + order + "}"; } }
The order in the above class is the order information object. In the actual business development process, the order information should be passed to implement the order cancellation business (the order will be automatically canceled if it is not paid for 30 minutes).
The Delayed interface inherits from the Comparable interface, so the compareTo method needs to be implemented to sort delayed tasks in the queue according to "delay time".
The getDelay method is a Delayed interface method. Implementing this method provides the countdown time for obtaining delayed tasks
First we need a container to permanently save the delay Task queue, if it is a Spring development environment, we can do this.
@Bean("orderDelayQueue") public DelayQueue<OrderDelayObject> orderDelayQueue(){ return new DelayQueue<OrderDelayObject>(); }
When the user places an order, put the order placing task into the delay queue
@Resource private DelayQueue<OrderDelayObject> orderDelayQueue; //发起订单下单的时候将订单演示对象放入orderDelayQueue orderDelayQueue.add( new OrderDelayObject( "订单延时取消任务", 30 * 60 * 1000, //延时30分钟 "延时任务订单对象信息" ) );
Open a thread in the system, continuously obtain messages from the queue, and then delay messages are processed. The take method of DelayQueue
obtains the delayed task object from the queue. If the number of queue elements is 0, or the "delay time task" has not been reached, the thread will be blocked.
@Component public class DelayObjectConsumer implements InitializingBean { @Resource private DelayQueue<OrderDelayObject> orderDelayQueue; @Override public void afterPropertiesSet() throws Exception { while (true) { OrderDelayObject task = orderDelayQueue.take(); System.out.println(task.toString()); System.out.println(task.getOrder()); //根据order订单信息,去查询该订单的支付信息 //如果用户没有进行支付,将订单从数据库中关闭 //如果订单并发量比较大,这里可以考虑异步或线程池的方式进行处理 } } }
It should be noted that the delayed task processing of the while-true loop here is executed sequentially. When the order concurrency is relatively large, asynchronous processing needs to be considered to complete the order closing operation. I have previously written an observable and easily configurable thread pool open source project for SpringBoot, which may be helpful to you. The source code address
has been tested by me and placed in the delayed task of orderDelayQueue. In half an hour Then get the correct execution processing. It shows that our implementation is correct.
Using DelayQueue to implement delayed tasks is very simple and convenient. All are implemented in standard JDK code, and there is no need to introduce third-party dependencies (not dependent on redis implementation, message queue implementation, etc.), very lightweight.
Its disadvantage is that all operations are based on application memory. Once an application single point of failure occurs, delayed task data may be lost. If the order concurrency is very large, because DelayQueue is unbounded, the larger the order volume, the more objects in the queue, which may cause the risk of OOM. Therefore, using DelayQueue to implement delayed tasks is only suitable for situations where the task volume is small.
The above is the detailed content of How does Java DelayQueue implement delayed tasks?. For more information, please follow other related articles on the PHP Chinese website!