LinkedBlockingQueue is also implemented using a one-way linked list. It also has two Nodes, which are used to store the first and last nodes respectively, and there is also an atomic variable count with an initial value of 0, which is used to record the number of queue elements. There are also two instances of ReentrantLock, which are used to control the atomicity of elements entering and dequeuing respectively. Among them, takeLock is used to control that only one thread can obtain elements from the queue head at the same time, and other threads must wait. PutLock controls that only one thread can obtain elements from the queue head at the same time. One thread can acquire the lock and add elements to the end of the queue, and other threads must wait. In addition, notEmpty and notFull are condition variables. They have an internal condition queue to store threads that are blocked when entering and exiting the queue. In fact, this is the producer-consumer model. The following is the code to create an exclusive lock.
private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
When the calling thread performs operations such as take and poll on the LinkedBlockingQueue instance, it needs to obtain the takeLock lock to ensure that only one thread can operate the head node of the linked list at the same time. In addition, since the maintenance of the condition queue inside the condition variable notEmpty uses the lock state management mechanism of takeLock, the calling thread must first obtain the takeLock lock before calling the await and signal methods of notEmpty, otherwise an IllegalMonitorStateException exception will be thrown. NotEmpty maintains a condition queue internally. When a thread acquires the takeLock and calls the await method of notEmpty, the calling thread will be blocked, and then the thread will be placed in the condition queue inside notEmpty to wait until a thread calls notEmpty. signal method.
When performing operations such as put and offer on a LinkedBlockingQueue instance, you need to obtain the putLock lock to ensure that only one thread can operate the tail node of the linked list at the same time. Similarly, since the maintenance of the condition queue inside the condition variable notFull uses the lock state management mechanism of putLock, the calling thread must first acquire the putLock lock before calling the await and signal methods of notFull, otherwise an IllegalMonitorStateException exception will be thrown. NotFull maintains a condition queue internally. When a thread acquires the putLock lock and calls the await method of notFull, the calling thread will be blocked, and then the thread will be placed in the condition queue inside notFull to wait until a thread calls notFull. signal method. The following is the code for the parameterless constructor of LinkedBlockingQueue.
The following is the parameter-free construction code of LinkedBlockingQueue
public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalAgrumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
It can be seen from this code that the default queue capacity is 0x7fffffff, and users can also specify the capacity themselves, so to a certain extent It can be said that LinkedBlockingQueue is a bounded blocking queue.
offer operation
public boolean offer(E e) { //(1) if (e == null) throw new NullPointerException(); //(2) final AtomicInteger count = this.count; if (count.get() == capacity) return false; //(3) int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //(4) if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } } finally { //(6) putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); //(8) return c >= 0; }
Code (2) determines if the current queue is full, discard the current element and return false
Code (3) Obtained putLock lock, after the current thread acquires the lock, other threads calling put and offer operations will be blocked (the blocked threads will be placed in the AQS blocking queue of the putLock lock).
Code (4) re-judges whether the current queue is full. This is because other threads may have added new elements to the queue through put or offer operations during the execution of code (2) and the acquisition of the putLock lock. If the queue is indeed not full, new elements will be added to the queue and the counter will be incremented.
Code (5) determines that if there is still free space in the queue after the new element is added to the queue, the notFull conditional queue will wake up because the await operation of notFull is called (such as when the put method is executed and the queue is full). A blocked thread can wake up a queued thread in advance because the queue is now free.
Code (6) releases the acquired putLock lock. It should be noted here that the lock release must be done in finally because even if the try block throws an exception, finally will be executed. In addition, after the lock is released, one of the other threads blocked due to calling the put operation will acquire the lock.
C0 in code (7) indicates that there is at least one element in the queue when executing code (6) to release the lock. If there is an element in the queue, the signalNotEmpty operation is performed.
The above is the detailed content of How to use LinkedBlockingQueue queue in Java concurrent programming?. For more information, please follow other related articles on the PHP Chinese website!