队列在生活中随处可见,医院缴费需要排队、做核酸需要排队、汽车等红绿灯需要排队等等。
队列是一个按照先来到就排在前面,后来到排在后面的数据结构,并且出队的时候也是按照先来到先出队。使用数组和链表进行实现。通常用于协调任务的执行和数据的交换。
LinkedBlockingQueue 是一个可选有界阻塞队列,有界指的是队列存在一个最大容量;阻塞指的是如果队列已经满了,想要往队列继续添加元素的话,那么这个操作将会被暂停,直到队列中有空位才会继续完成添加操作。如果队列已经为空,想要从队列中获取元素,那么这个操作将会被暂停,直接队列中存在元素才会继续完成获取操作。
LinkedBlockingQueue 内部使用链表作为元素的存储结构。内部使用了两个锁,分别使用于存操作和取操作。
执行存取操作时,都必须先获取锁,才可以执行存取操作,保证 LinkedBlockingQueue 是线程安全。
LinkedBlockingQueue 通过两个 Condition 条件队列,一个 notFull 条件,一个 notEmpty 条件。在对队列进行插入元素操作时,判断当前队列已经满,则通过 notFull 条件将线程阻塞,直到其他线程通知该线程队列可以继续插入元素。在对队列进行移除元素操作时,判断当前队列已经空,则通过 notEmpty 条件阻塞线程,直到其他线程通过该线程可以继续获取元素。
这样保证线程的存取操作不会出现错误。避免队列在满时,丢弃插入的元素;也避免在队列空时取到一个 null 值。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
无参构造函数中,默认使用 Integer.MAX_VALUE
作为队列的最大容量。
有参构造函数中,可以自己指定队列的最大容量,并且创建了头节点和尾节点。那么 LinkedBlockingQueue 使用的是有头单向链表。
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
并且在对象初始化时,创建了两个锁,分别使用于存操作和取操作。创建了两个条件队列,分别用于队列空和满的情况。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1.获取锁
2.判断当前队列是否已经满了
如果队列已经满了,调用 notFull 条件队列的 await() 方法,将该线程阻塞,暂停该线程的插入操作。避免内部溢出的问题。
如果没有满,则直接调用入队函数 enqueue 插入到队列末尾。
3.检查此时队列是否已满
如果未满,则调用 notFull 条件队列的 signal() 方法,唤醒被阻塞在 notFull 条件队列的线程。
4.解锁
检查插入元素前的队列元素数量是否等于0
等于 0,则调用 notEmpty 条件队列的 signal() 方法,通知其队列现在不为空,可以唤醒阻塞线程获取元素。
为什么需要调用 notFull 条件队列的 signal() 方法? 因为队列取操作和存操作所使用的锁是不一样的,那么就说明,一个线程执行存入操作时,其他线程是可以执行取出操作的。我们来看下面这个例子:
队列总容量为 5,当前元素数量为5。线程 A 获取了存锁,想要插入了元素。但是因为队列容量已满,释放锁,并且加入到条件队列中,等待被唤醒。
线程 B 获取了存锁,想要插入了元素。但是因为队列容量已满,释放锁,并且加入到条件队列中,等待被唤醒。
线程 C 获取了取锁,取出了元素 1。并且通过 notFull 的 signal 方法唤醒条件队列中被阻塞的线程 A。线程 A 被唤醒后加入到同步队列中,但是此时还没有竞争到锁。
线程 D 获取了取锁,取出了元素 2。但是还没有执行到唤醒阻塞线程的代码。
线程 A 竞争到锁,开始执行插入元素操作。将元素插入之后,检查到队列元素数量为 4,小于队列的总容量,因此会执行 notFull 的 signal 方法唤醒条件队列中被阻塞的线程 B。线程 B 被唤醒后加入到同步队列中,开始竞争锁。
线程 B 竞争到锁,开始执行插入元素操作。将元素插入之后,检查到队列元素数量等于 5,不进行唤醒操作。
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
以上是怎么掌握Java LinkedBlockingQueue的详细内容。更多信息请关注PHP中文网其他相关文章!