Home  >  Article  >  Java  >  Detailed explanation of the use of PriorityBlockingQueue in Java concurrent programming

Detailed explanation of the use of PriorityBlockingQueue in Java concurrent programming

王林
王林forward
2023-05-08 08:43:071322browse

    PriorityBlockingQueue is a thread-safe bounded blocking queue that implements a heap data structure in Java. It can safely add, delete and obtain elements in multi-threaded scenarios, and can sort elements according to their priority.

    1. Overview of PriorityBlockingQueue

    The PriorityBlockingQueue class implements the BlockingQueue interface. It is a thread-safe queue that inherits from the AbstractQueue class, which in turn implements the Queue interface. PriorityBlockingQueue is a bounded queue whose capacity can be specified in the constructor. If not specified, the default size is Integer.MAX_VALUE. At the same time, PriorityBlockingQueue also supports sorting according to the priority of elements. This is because PriorityBlockingQueue implements a heap data structure internally.

    2. PriorityBlockingQueue source code analysis

    1. Container

    PriorityBlockingQueue internally uses an Object type array queue to store elements, and also uses an int type variable size to store elements. Record the number of elements. The following is the definition in the PriorityBlockingQueue class:

    private transient Object[] queue;
    private transient int size;

    2. Comparator

    PriorityBlockingQueue can be sorted according to the priority of elements. This is because PriorityBlockingQueue maintains a small root heap or a large root heap internally. In the constructor, we can choose to use the element's own comparison method or a custom comparator to sort the elements. If no comparator is specified, PriorityBlockingQueue will use the element's own comparison method for sorting.

    private final Comparator<? super E> comparator;

    3. Constructor

    PriorityBlockingQueue provides multiple constructors. We can choose to use the parameterless constructor to create a PriorityBlockingQueue with a default capacity of Integer.MAX_VALUE, or use it with Constructor for initial capacity or custom comparator. The following are the two constructors of the PriorityBlockingQueue class:

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }

    4. Add elements

    The method to add elements in PriorityBlockingQueue is the offer() method. It will first check whether the capacity is sufficient. If the capacity is insufficient Then the expansion operation will be performed. The expansion method is to increase the length of the original array by half. Then, it adds the new element to the end of the queue and filters the element to the appropriate position through the siftUp() method to maintain the properties of the heap.

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator; 
            if (n == 0) { array[0] = e; } 
            else { siftUp(n, e, array, cmp); } 
            size = n + 1; notEmpty.signal(); 
        } finally { 
            lock.unlock(); 
        } 
        return true; 
    }

    5. Get elements

    The method to get elements in PriorityBlockingQueue is the take() method. It will first check whether the queue is empty. If the queue is empty, the current thread will be blocked until An element is added to the queue. Then, it will get the head element of the queue and move the last element of the queue to the head through the siftDown() method to maintain the properties of the heap.

    public E take() throws InterruptedException { 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        E result; 
        try { 
            while (size == 0) notEmpty.await(); 
            result = extract(); 
        } finally {
            lock.unlock(); 
        } 
        return result; 
    }
    private E extract() { 
        final Object[] array = queue; 
        final E result = (E) array[0]; 
        final int n = --size; 
        final E x = (E) array[n]; 
        array[n] = null; 
        if (n != 0) 
        siftDown(0, x, array, comparator); 
        return result; 
    }

    6. Maintain heap properties

    PriorityBlockingQueue uses a small root heap or a large root heap to maintain the priority of elements. Here we take the small root heap as an example. The characteristic of the small root heap is that the value of the parent node is less than or equal to the value of the left and right child nodes. The heap in PriorityBlockingQueue is implemented through an array. When an element is added, the new element is added to the end of the queue and filtered up to the appropriate position through the siftUp() method to maintain the properties of the heap. When an element is obtained, the head element of the queue is obtained, and the end element of the queue is moved to the head through the siftDown() method to maintain the nature of the heap. The following is the code implementation of the siftUp() and siftDown() methods:

    private static <T> 
    void siftUp(int k, T x, Object[] array, Comparator<? super T> cmp) { 
        if (cmp != null) 
        siftUpUsingComparator(k, x, array, cmp); 
        else siftUpComparable(k, x, array); 
    }
    @SuppressWarnings("unchecked") 
    private static <T> 
    void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { 
        while (k > 0) { 
            int parent = (k - 1) >>> 1; 
            Object e = array[parent]; 
            if (cmp.compare(x, (T) e) >= 0) 
            break; 
            array[k] = e; 
            k = parent; 
        } 
        array[k] = x; 
    }
    @SuppressWarnings("unchecked") 
    private static <T> 
    void siftUpComparable(int k, T x, Object[] array) { 
        Comparable<? super T> key = (Comparable<? super T>) x; 
        while (k > 0) { 
            int parent = (k - 1) >>> 1; 
            Object e = array[parent]; 
            if (key.compareTo((T) e) >= 0) 
            break; 
            array[k] = e; 
            k = parent; 
        } 
        array[k] = key; 
    }
    private static <T> 
    void siftDown(int k, T x, Object[] array, Comparator<? super T> cmp) { 
        if (cmp != null) 
        siftDownUsingComparator(k, x, array, cmp); 
        else siftDownComparable(k, x, array); 
    }
    @SuppressWarnings("unchecked") 
    private static <T> 
    void siftDownUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { 
        int half = size >>> 1; 
        while (k < half) { 
            int child = (k << 1) + 1; 
            Object c = array[child]; 
            int right = child + 1; 
            if (right < size && cmp.compare((T) c, (T) array[right]) > 0) 
            c = array[child = right]; 
            if (cmp.compare(x, (T) c) <= 0) 
            break; 
            array[k] = c; 
            k = child; 
        } 
        array[k] = x; 
    }
    @SuppressWarnings("unchecked") 
    private static <T> 
    void siftDownComparable(int k, T x, Object[] array) { 
        Comparable<? super T> key = (Comparable<? super T>) x; 
        int half = size >>> 1; 
        while (k < half) { 
            int child = (k << 1) + 1; 
            Object c = array[child]; 
            int right = child + 1; 
            if (right < size && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 
            c = array[child = right]; 
            if (key.compareTo((T) c) <= 0) 
            break; 
            array[k] = c; 
            k = child; 
        } 
        array[k] = key; 
    }

    siftUp() method and siftDown() method both use the siftUpUsingComparator() method and siftDownUsingComparator() method, which use Comparator to implement the heap The upper filter and the lower filter. When PriorityBlockingQueue does not specify a Comparator, the element's own Comparable will be used to implement upper and lower filtering of the heap.

    The above is the detailed content of Detailed explanation of the use of PriorityBlockingQueue in Java concurrent programming. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete