jdk1.7.0_79
"10.동시 패킷 차단 대기열" ArrayBlockingQueue"을 간략하게 분석했습니다 ArrayBlockingQueue 이 문서에도 소개된 소스 코드의 일부는 Java동시성 패키지의 차단 대기열LinkedBlockingQueue입니다. ArrayBlockingQueue큐는 배열로 구현되는 반면 LinkedBlockingQueue큐는 연결 목록(단방향 연결 목록)으로 구현되므로 LinkedBlockingQueue에 Node내부 클래스가 있습니다. 연결된 목록 노드를 나타냅니다. ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ 또한 구성 방법이 있는데, ArrayBlockingQueue과 약간 다릅니다. 在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。 在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量: 可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。 这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。 예외 던지기 반환 값(비차단) 특정 기간 내 반환 값 반환 값(차단) 삽입 offer(e) // 가득 차지 않으면 true를 반환하고, 대기열이 가득 차면 false를 반환합니다. 비차단은 즉시 반환됩니다. offer(e, time, unit)//지정된 시간 내에 큐에 데이터를 삽입할 수 없는 경우 false를 반환합니다. , 성공적으로 삽입되면 true가 반환됩니다. put(e)//큐가 가득 차지 않으면 직접 삽입에는 반환 값이 없습니다. 다시 삽입하기 전에 대기열이 가득 차지 않았습니다. LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueue的add方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue。 在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。 前两个add和offer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列插入的最后一个方法来看上面出现的enqueue入队方法。 抛出异常 返回值(非阻塞) 一定时间内返回值 返回值(阻塞) remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 前两个remove和poll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列出队的最后一个方法来看上面出现的dequeue入队方法。 最后一个方法size。static final class Node<E> {
E item;//入队元素 Node<E> next;//指向后继节点 Node(E x) {
item = x;
}
}
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
队列元素的插入
/Add(e) // 대기열이 가득 차지 않으면
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) {
if (offer(e))//offer方法由Queue接口定义 return true;
else throw new IllegalStateException();
}
1 //LinkedBlockingQueue#offer 2 public boolean offer(E e) {
3 if (e == null) throw new NullPointerException();
4 final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 5 if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 6 return false;
7 int c = -1;
8 Node<E> node = new Node(e);
9 final ReentrantLock putLock = this.putLock;//插入锁 10 putLock.lock();//获得插入锁 11 try {
12 if (count.get() < capacity) {
13 enqueuer(node);//入队 14 c = count.getAndIncrement();//队列数据总数自增+1后返回 15 if (c + 1 < capacity)
16 notFull.signal();//唤醒非满等待队列上的线程 17 }
18 } finally {
19 putLock.unlock();
20 }
21 if (c == 0)
22 signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 23 return c >= 0
24 }
//LinkedBlockingQueue#put public void put(E e) throws InterruptedException {
if (e == null) throws new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterrupted();//能被线程中断地获取锁 try {
while (count.get() == capacity) {//队列数据量等于队列容量 notFull.await();//休眠非满等待队列上的线程 }
enqueuer(node);//入队 c = count.getAndIncrement();//队列数据总数自增+1后返回 if (c + 1 < capacity)//还没有达到队列容量 notFull.signal();//唤醒非满等待队列上的线程 } finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//唤醒非空等待队列上的线程 }
private void enqueuer(Node<E> node) {
last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 }
队列元素的删除
//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 public E remove() {
E x = poll();//poll方法由Queue接口定义 if (x != null)
return x;
else throw new NoSuchElementException();
}
AtomicInteger count = (count.get() == 0 = c = -1 ReentrantLock takeLock = (count.get() > 0) { x = dequeuer(); c = count.getAndDecrement(); ( c > 1 (c ==
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
take.lockInterruptibly();//可被线程中断返回地获取锁 try {
while (count.get() == 0) {//队列数据为空 notEmpty.await();//休眠非空等待队列上的线程 }
x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//唤醒非空等待队列上的线程 } finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//唤醒非满等待队列 return x;
}
private E dequeue() {
Node<E> h = head;//头节点,为空 Node<E> first = h.next;
h.next = h;//此时没有节点指向头节点,便于GC head = first;
E x = first.item;
first.item = null;
return x;
}
public int size() {
return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 }
위 내용은 동시 패킷 차단 대기열을 위한 LinkedBlockingQueue의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!