Home >Java >javaTutorial >LinkedBlockingQueue for concurrent packet blocking queue
##jdk1.7.0_79
## Briefly mentioned in the above "10.Concurrent Packet Blocking QueueArrayBlockingQueue" Parsed part of the source code of ArrayBlockingQueue, and also introduces in this article the blocking queue in Java concurrent packageLinkedBlockingQueue. ArrayBlockingQueueThe queue is implemented by an array, while the LinkedBlockingQueue queue is implemented by a linked list (one-way linked list), so in LinkedBlockingQueueThere is a Node internal class to represent the nodes of the linked list. ## ##static final class Node<E> {
E item;//入队元素 Node<E> next;//指向后继节点 Node(E x) {
item = x;
}
}
Similarly it also has 3
ArrayBlockingQueue is slightly different. 在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。 在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量: 可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。 这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。 #throw an exception Return value (non-blocking)
; when the queue is full, it throws IllegalStateException(“Queue full”)Exception——AbstractQueue false. Non-blocking returns immediately. LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueue的add方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue。 在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。 前两个add和offer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列插入的最后一个方法来看上面出现的enqueue入队方法。 抛出异常 返回值(非阻塞) 一定时间内返回值 返回值(阻塞) remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 前两个remove和poll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列出队的最后一个方法来看上面出现的dequeue入队方法。 最后一个方法size。 1 public LinkedBlockingQueue() {
2 this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 3 }
4 public LinkedBlockingQueue(int capacity) {
5 if (capacity <= o) throw new IllegalArgumentException();
6 this.capacity = capacity;
7 last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 8 }
9 public LinkedBlockingQueue(Collection<? extends E> c ) {
10 this(Integer.MAX_VALUE);
11 final ReentrantLock putLock = this.putLock;
12 putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 13 try {
14 int n = 0;
15 for (E e : c) {
16 if (e == null)
17 throw new NullPointerException();
18 if (n == capacity)
19 throw new IllegalStateException("Queue full");
20 enqueue(new Node<E>(e));//入队 21 ++n;
22 }
23 count.set(n);
24 } finally {
25 putLock.unlock();
26 }
27 }
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
队列元素的插入
##Insert
add(e)// #offer( e) //When the queue is not full, return Set the waiting time. If data cannot be inserted into the queue within the specified time, false
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) {
if (offer(e))//offer方法由Queue接口定义 return true;
else throw new IllegalStateException();
}
1 //LinkedBlockingQueue#offer 2 public boolean offer(E e) {
3 if (e == null) throw new NullPointerException();
4 final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 5 if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 6 return false;
7 int c = -1;
8 Node<E> node = new Node(e);
9 final ReentrantLock putLock = this.putLock;//插入锁 10 putLock.lock();//获得插入锁 11 try {
12 if (count.get() < capacity) {
13 enqueuer(node);//入队 14 c = count.getAndIncrement();//队列数据总数自增+1后返回 15 if (c + 1 < capacity)
16 notFull.signal();//唤醒非满等待队列上的线程 17 }
18 } finally {
19 putLock.unlock();
20 }
21 if (c == 0)
22 signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 23 return c >= 0
24 }
//LinkedBlockingQueue#put public void put(E e) throws InterruptedException {
if (e == null) throws new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterrupted();//能被线程中断地获取锁 try {
while (count.get() == capacity) {//队列数据量等于队列容量 notFull.await();//休眠非满等待队列上的线程 }
enqueuer(node);//入队 c = count.getAndIncrement();//队列数据总数自增+1后返回 if (c + 1 < capacity)//还没有达到队列容量 notFull.signal();//唤醒非满等待队列上的线程 } finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//唤醒非空等待队列上的线程 }
private void enqueuer(Node<E> node) {
last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 }
队列元素的删除
//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 public E remove() {
E x = poll();//poll方法由Queue接口定义 if (x != null)
return x;
else throw new NoSuchElementException();
}
AtomicInteger count = (count.get() == 0 = c = -1 ReentrantLock takeLock = (count.get() > 0) { x = dequeuer(); c = count.getAndDecrement(); ( c > 1 (c ==
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
take.lockInterruptibly();//可被线程中断返回地获取锁 try {
while (count.get() == 0) {//队列数据为空 notEmpty.await();//休眠非空等待队列上的线程 }
x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//唤醒非空等待队列上的线程 } finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//唤醒非满等待队列 return x;
}
private E dequeue() {
Node<E> h = head;//头节点,为空 Node<E> first = h.next;
h.next = h;//此时没有节点指向头节点,便于GC head = first;
E x = first.item;
first.item = null;
return x;
}
public int size() {
return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 }
The above is the detailed content of LinkedBlockingQueue for concurrent packet blocking queue. For more information, please follow other related articles on the PHP Chinese website!