PriorityBlockingQueue is a thread-safe bounded blocking queue that implements a heap data structure in Java. It can safely add, delete and obtain elements in multi-threaded scenarios, and can sort elements according to their priority.
The PriorityBlockingQueue class implements the BlockingQueue interface. It is a thread-safe queue that inherits from the AbstractQueue class, which in turn implements the Queue interface. PriorityBlockingQueue is a bounded queue whose capacity can be specified in the constructor. If not specified, the default size is Integer.MAX_VALUE. At the same time, PriorityBlockingQueue also supports sorting according to the priority of elements. This is because PriorityBlockingQueue implements a heap data structure internally.
PriorityBlockingQueue internally uses an Object type array queue to store elements, and also uses an int type variable size to store elements. Record the number of elements. The following is the definition in the PriorityBlockingQueue class:
private transient Object[] queue; private transient int size;
PriorityBlockingQueue can be sorted according to the priority of elements. This is because PriorityBlockingQueue maintains a small root heap or a large root heap internally. In the constructor, we can choose to use the element's own comparison method or a custom comparator to sort the elements. If no comparator is specified, PriorityBlockingQueue will use the element's own comparison method for sorting.
private final Comparator<? super E> comparator;
PriorityBlockingQueue provides multiple constructors. We can choose to use the parameterless constructor to create a PriorityBlockingQueue with a default capacity of Integer.MAX_VALUE, or use it with Constructor for initial capacity or custom comparator. The following are the two constructors of the PriorityBlockingQueue class:
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; this.comparator = comparator; }
The method to add elements in PriorityBlockingQueue is the offer() method. It will first check whether the capacity is sufficient. If the capacity is insufficient Then the expansion operation will be performed. The expansion method is to increase the length of the original array by half. Then, it adds the new element to the end of the queue and filters the element to the appropriate position through the siftUp() method to maintain the properties of the heap.
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (n == 0) { array[0] = e; } else { siftUp(n, e, array, cmp); } size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
The method to get elements in PriorityBlockingQueue is the take() method. It will first check whether the queue is empty. If the queue is empty, the current thread will be blocked until An element is added to the queue. Then, it will get the head element of the queue and move the last element of the queue to the head through the siftDown() method to maintain the properties of the heap.
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while (size == 0) notEmpty.await(); result = extract(); } finally { lock.unlock(); } return result; } private E extract() { final Object[] array = queue; final E result = (E) array[0]; final int n = --size; final E x = (E) array[n]; array[n] = null; if (n != 0) siftDown(0, x, array, comparator); return result; }
PriorityBlockingQueue uses a small root heap or a large root heap to maintain the priority of elements. Here we take the small root heap as an example. The characteristic of the small root heap is that the value of the parent node is less than or equal to the value of the left and right child nodes. The heap in PriorityBlockingQueue is implemented through an array. When an element is added, the new element is added to the end of the queue and filtered up to the appropriate position through the siftUp() method to maintain the properties of the heap. When an element is obtained, the head element of the queue is obtained, and the end element of the queue is moved to the head through the siftDown() method to maintain the nature of the heap. The following is the code implementation of the siftUp() and siftDown() methods:
private static <T> void siftUp(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftUpUsingComparator(k, x, array, cmp); else siftUpComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } private static <T> void siftDown(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftDownUsingComparator(k, x, array, cmp); else siftDownComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftDownUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftDownComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }
siftUp() method and siftDown() method both use the siftUpUsingComparator() method and siftDownUsingComparator() method, which use Comparator to implement the heap The upper filter and the lower filter. When PriorityBlockingQueue does not specify a Comparator, the element's own Comparable will be used to implement upper and lower filtering of the heap.
The above is the detailed content of Detailed explanation of the use of PriorityBlockingQueue in Java concurrent programming. For more information, please follow other related articles on the PHP Chinese website!