1. What is a blocking queue?

BlockingQueue is a queue that supports two additional operations. These two additional operations are: when the queue is empty, the thread getting the element waits for the queue to become non-empty. When the queue is full, the thread storing the element waits for the queue to become available. Blocking queues are often used in producer and consumer scenarios. The producer is the thread that adds elements to the queue, and the consumer is the thread that takes elements from the queue. The blocking queue is a container where the producer stores elements, and the consumer only takes elements from the container.

The blocking queue provides four processing methods:

2015127142052051.png (522×105)

Throwing an exception: When the blocking queue is full, inserting elements into the queue will throw an IllegalStateException ("Queue full") exception. When the queue is empty, NoSuchElementException will be thrown when getting elements from the queue.
Return special value: The insertion method will return whether it is successful or not, and return true if successful. The removal method is to take out an element from the queue, if there is no element, return null
Always blocking: When the blocking queue is full, if the producer thread puts elements into the queue, the queue will block the producer thread until it gets the data or exits in response to an interrupt. When the queue is empty and the consumer thread attempts to take elements from the queue, the queue will also block the consumer thread until the queue is available.
Timeout exit: When the blocking queue is full, the queue will block the producer thread for a period of time. If it exceeds a certain time, the producer thread will exit.
2. Blocking queue in Java

JDK7 provides 7 blocking queues. They are

  1. ArrayBlockingQueue: A bounded blocking queue composed of an array structure.
  2. LinkedBlockingQueue: A bounded blocking queue composed of a linked list structure.
  3. PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting.
  4. DelayQueue: An unbounded blocking queue implemented using a priority queue.
  5. SynchronousQueue: A blocking queue that does not store elements.
  6. LinkedTransferQueue: an unbounded blocking queue composed of a linked list structure.
  7. LinkedBlockingDeque: A two-way blocking queue composed of a linked list structure.

ArrayBlockingQueue is a bounded blocking queue implemented using an array. This queue sorts elements on a first-in-first-out (FIFO) basis. By default, visitors are not guaranteed fair access to the queue. The so-called fair access queue refers to all blocked producer threads or consumer threads. When the queue is available, the queue can be accessed in the order of blocking, that is, the producer thread blocked first , you can insert elements into the queue first, and the consumer thread that blocks first can get elements from the queue first. Usually, throughput is reduced to ensure fairness. We can create a fair blocking queue using the following code:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

Visitor fairness is achieved using reentrant locks, the code is as follows:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();

LinkedBlockingQueue is a bounded blocking queue implemented using a linked list. The default and maximum length of this queue is Integer.MAX_VALUE. This queue sorts elements on a first-in, first-out basis.

PriorityBlockingQueue is an unbounded queue that supports priority. By default, the elements are arranged in natural order, and the sorting rules of the elements can also be specified through the comparator. Elements are sorted in ascending order.

DelayQueue is an unbounded blocking queue that supports delayed acquisition of elements. Queues are implemented using PriorityQueue. The elements in the queue must implement the Delayed interface. When creating the element, you can specify how long it will take to get the current element from the queue. Elements can only be fetched from the queue when the delay expires. We can use DelayQueue in the following application scenarios:

Cache system design: You can use DelayQueue to save the validity period of cache elements, and use a thread to query the DelayQueue in a loop. Once the element can be obtained from the DelayQueue, it means that the cache validity period has expired.
Scheduled tasks. Use DelayQueue to save the tasks and execution time that will be executed on the day. Once the task is obtained from DelayQueue, it will be executed. For example, TimerQueue is implemented using DelayQueue.
Delayed in the queue must implement compareTo to specify the order of elements. For example, put the one with the longest delay at the end of the queue. The implementation code is as follows:

public int compareTo(Delayed other) {
      if (other == this) // compare zero ONLY if same object
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
  else if (sequenceNumber < x.sequenceNumber)
          return -1;
          return 1;
      long d = (getDelay(TimeUnit.NANOSECONDS) -
      return (d == 0) &#63; 0 : ((d < 0) &#63; -1 : 1);

3. How to implement the Delayed interface

We can refer to the ScheduledFutureTask class in ScheduledThreadPoolExecutor. This class implements the Delayed interface. First: when the object is created, use time to record when the previous object can be used. The code is as follows:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();

Then use getDelay to query how long the current element needs to be delayed. The code is as follows:

public long getDelay(TimeUnit unit) {
      return unit.convert(time - now(), TimeUnit.NANOSECONDS);




long delay = first.getDelay(TimeUnit.NANOSECONDS);
          if (delay <= 0)
            return q.poll();
          else if (leader != null)

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。



Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);



对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。





public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;




public ArrayBlockingQueue(int capacity) {
public ArrayBlockingQueue(int capacity, boolean fair) {
public ArrayBlockingQueue(int capacity, boolean fair,
             Collection<&#63; extends E> c) {



public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  try {
    try {
      while (count == items.length)
    } catch (InterruptedException ie) {
      notFull.signal(); // propagate to non-interrupted thread
      throw ie;
  } finally {




private void insert(E x) {
  items[putIndex] = x;
  putIndex = inc(putIndex);



public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  try {
    try {
      while (count == 0)
    } catch (InterruptedException ie) {
      notEmpty.signal(); // propagate to non-interrupted thread
      throw ie;
    E x = extract();
    return x;
  } finally {


private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];
  items[takeIndex] = null;
  takeIndex = inc(takeIndex);
  return x;



