Retrieve and remove the head of the queue, if necessary, wait the specified wait time for the element to be available, or return null if the timeout occurs |
|
ArrayBlockingQueue 源码简解
实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列
线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。
/**
* 构造方法
* 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列
* @param capacity 固定容量
* @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 根据fair创建对应的锁
// 条件对象,配合容器能满足业务
notEmpty = lock.newCondition(); // 出队条件对象
notFull = lock.newCondition(); // 入队条件对象
}
/**
* 入队方法
* 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用
*/
public void put(E e) throws InterruptedException {
checkNotNull(e); // 检查put对象是否为空,空抛出异常
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文
try {
// 队列中元素的数量 等于 排队元素的长度
while (count == items.length)
notFull.await(); // 见下文
enqueue(e); // 元素入队
} finally {
lock.unlock();
}
}
/**
* 出队方法
* 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 见下文
try {
while (count == 0)
notEmpty.await(); // 见下文
return dequeue(); // 元素出队
} finally {
lock.unlock();
}
}
令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:
在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。
如果当前线程有以下两种情况之一:
生产者消费者模式
BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。
生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务
@Slf4j
@Slf4j
public class Producer implements Runnable{
// 作为参数的阻塞队列
private BlockingQueue<Integer> blockingQueue;
private final int stopTag;
/**
* 构造方法
* @param blockingQueue
* @param stopTag
*/
public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
this.blockingQueue = blockingQueue;
this.stopTag = stopTag;
}
@Override
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
// 每个生产者都随机生产10种商品
for (int i = 0; i < 10; i++) {
int product = ThreadLocalRandom.current().nextInt(1000,1100);
log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);
blockingQueue.put(product);
}
// 生产终止标记
blockingQueue.put(stopTag);
log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());
}
}
消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务
@Slf4j
public class Consumer implements Runnable{
// 作为参数的阻塞队列
private BlockingQueue<Integer> queue;
private final int stopTage;
public Consumer(BlockingQueue<Integer> queue, int stopTage) {
this.queue = queue;
this.stopTage = stopTage;
}
@Override
public void run() {
try {
while (true) {
Integer product = queue.take();
if (product.equals(stopTage)) {
log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());
return;
}
log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者
public class ProductConsumerTest {
public static void main(String[] args) {
// 阻塞队列容量
int blockingQueueSize = 10;
// 生产者数量
int producerSize = 16;
// 消费者数量 = 计算机线程核数 8
int consumerSize = Runtime.getRuntime().availableProcessors();
// 终止消费标记
int stopTag = Integer.MAX_VALUE;
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
// 创建16个生产者线程
for (int i = 0; i < producerSize; i++) {
new Thread(new Producer(blockingQueue, stopTag)).start();
}
// 创建8个消费者线程
for (int j = 0; j < consumerSize; j++) {
new Thread(new Consumer(blockingQueue, stopTag)).start();
}
}
}
延迟队列 DelayQueue
定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。
/**
* 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
*/
public class MovieTicket implements Delayed {
//延迟时间
private final long delay;
//到期时间
private final long expire;
//数据
private final String msg;
//创建时间
private final long now;
public long getDelay() {
return delay;
}
public long getExpire() {
return expire;
}
public String getMsg() {
return msg;
}
public long getNow() {
return now;
}
/**
* @param msg 消息
* @param delay 延期时间
*/
public MovieTicket(String msg , long delay) {
this.delay = delay;
this.msg = msg;
expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间
now = System.currentTimeMillis();
}
/**
* @param msg
*/
public MovieTicket(String msg){
this(msg,1000);
}
public MovieTicket(){
this(null,1000);
}
/**
* 获得延迟时间 用过期时间-当前时间,时间单位毫秒
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire
- System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* 越早过期的时间在队列中越靠前
* @param delayed
* @return
*/
@Override
public int compareTo(Delayed delayed) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS)
- delayed.getDelay(TimeUnit.MILLISECONDS));
}
}
测试类:
public static void main(String[] args) {
DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
MovieTicket ticket = new MovieTicket("电影票1",10000);
delayQueue.put(ticket);
MovieTicket ticket1 = new MovieTicket("电影票2",5000);
delayQueue.put(ticket1);
MovieTicket ticket2 = new MovieTicket("电影票3",8000);
delayQueue.put(ticket2);
log.info("message:--->入队完毕");
while( delayQueue.size() > 0 ){
try {
ticket = delayQueue.take();
log.info("电影票出队:{}",ticket.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同