>  기사  >  Java  >  Java 컬렉션 프레임워크 arrayblockingqueue 애플리케이션 분석

Java 컬렉션 프레임워크 arrayblockingqueue 애플리케이션 분석

高洛峰
高洛峰원래의
2017-01-23 10:30:511555검색


---------
1.ArrayDeque, (배열 이중 종료 큐)
2.PriorityQueue, ( 우선 순위 큐)
3.ConcurrentLinkedQueue, (연결된 목록 기반 동시 큐)
4.DelayQueue, (지연된 차단 큐) (차단 큐는 BlockingQueue 인터페이스를 구현함)
5.ArrayBlockingQueue, (동시 차단 큐 기반) on array)
6.LinkedBlockingQueue, (연결된 목록 기반 FIFO 차단 큐)
7.LinkedBlockingDeque, (연결된 목록 기반 FIFO 이중 종료 차단 큐)
8.PriorityBlockingQueue, (제한되지 않은 차단 큐) 우선 순위)
9.SynchronousQueue (동시 동기화 차단 큐)
----------------------------------- ---- ---------
ArrayBlockingQueue
는 어레이가 지원하는 제한된 차단 대기열입니다. 이 대기열은 FIFO(선입선출) 방식으로 요소를 정렬합니다. 대기열의 헤드는 대기열에 가장 오랫동안 있었던 요소입니다. 대기열의 꼬리는 가장 짧은 시간 동안 대기열에 있었던 요소입니다. 새 요소는 큐의 꼬리에 삽입되고 큐 검색 작업은 큐의 헤드부터 시작됩니다.
이것은 생산자가 삽입한 요소와 소비자가 추출한 요소를 보유하는 고정 크기 배열인 일반적인 "제한된 버퍼"입니다. 이러한 버퍼가 생성되면 용량을 늘릴 수 없습니다. 요소를 가득 찬 대기열에 넣으려고 하면 작업이 차단됩니다. 빈 대기열에서 요소를 추출하려고 하면 유사한 차단이 발생합니다.
이 클래스는 대기 중인 생산자 및 소비자 스레드를 주문하기 위한 선택적 공정성 전략을 지원합니다. 기본적으로 이 순서는 보장되지 않습니다. 그러나 공정성을 true로 설정하여 생성된 대기열은 FIFO 순서로 스레드에 대한 액세스를 허용합니다. 공정성은 일반적으로 처리량을 줄이지만 변동성을 줄이고 "불균형"을 방지합니다.

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
/** 队列元素 数组 */ 
private final E[] items; 
/** 获取、删除元素时的索引(take, poll 或 remove操作) */ 
private int takeIndex; 
/** 添加元素时的索引(put, offer或 add操作) */ 
private int putIndex; 
/** 队列元素的数目*/ 
private int count; 
/** 锁 */ 
private final ReentrantLock lock; 
/** 获取操作时的条件 */ 
private final Condition notEmpty; 
/** 插入操作时的条件 */ 
private final Condition notFull; 
//超出数组长度时,重设为0 
final int inc(int i) { 
return (++i == items.length)? 0 : i; 
} 
/** 
* 插入元素(在获得锁的情况下才调用) 
*/ 
private void insert(E x) { 
items[putIndex] = x; 
putIndex = inc(putIndex); 
++count; 
notEmpty.signal(); 
} 
/** 
* 获取并移除元素(在获得锁的情况下才调用) 
*/ 
private E extract() { 
final E[] items = this.items; 
E x = items[takeIndex]; 
items[takeIndex] = null; 
takeIndex = inc(takeIndex);//移到下一个位置 
--count; 
notFull.signal(); 
return x; 
} 
/** 
* 删除i位置的元素 
*/ 
void removeAt(int i) { 
final E[] items = this.items; 
// if removing front item, just advance 
if (i == takeIndex) { 
items[takeIndex] = null; 
takeIndex = inc(takeIndex); 
} else { 
// 把i后面的直到putIndex的元素都向前移动一个位置 
for (;;) { 
int nexti = inc(i); 
if (nexti != putIndex) { 
items[i] = items[nexti]; 
i = nexti; 
} else { 
items[i] = null; 
putIndex = i; 
break; 
} 
} 
} 
--count; 
notFull.signal(); 
} 
/** 
*构造方法,指定容量,默认策略(不是按照FIFO的顺序访问) 
*/ 
public ArrayBlockingQueue(int capacity) { 
this(capacity, false); 
} 
/** 
*构造方法,指定容量及策略 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair) { 
if (capacity <= 0) 
throw new IllegalArgumentException(); 
this.items = (E[]) new Object[capacity]; 
lock = new ReentrantLock(fair); 
notEmpty = lock.newCondition(); 
notFull = lock.newCondition(); 
} 
/** 
* 通过集合构造 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair, 
Collection<? extends E> c) { 
this(capacity, fair); 
if (capacity < c.size()) 
throw new IllegalArgumentException(); 
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 
add(it.next()); 
} 
/** 
* 插入元素到队尾(super调用offer方法) 
* public boolean add(E e) { 
* if (offer(e)) 
* return true; 
* else 
* throw new IllegalStateException("Queue full"); 
* } 
* 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量), 
* 在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。 
*/ 
public boolean add(E e) { 
return super.add(e); 
} 
/** 
* 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量), 
* 在成功时返回 true,如果此队列已满,则返回 false。 
*/ 
public boolean offer(E e) { 
if (e == null) throw new NullPointerException(); 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == items.length) 
return false; 
else { 
insert(e); 
return true; 
} 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。 
*/ 
public void put(E e) throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == items.length) 
notFull.await(); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
insert(e); 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。 
*/ 
public boolean offer(E e, long timeout, TimeUnit unit) 
throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != items.length) { 
insert(e); 
return true; 
} 
if (nanos <= 0)//如果时间到了就返回 
return false; 
try { 
nanos = notFull.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头,如果此队列为空,则返回 null。 
public E poll() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == 0) 
return null; 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 
public E take() throws InterruptedException { 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == 0) 
notEmpty.await(); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 
public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != 0) { 
E x = extract(); 
return x; 
} 
if (nanos <= 0) 
return null; 
try { 
nanos = notEmpty.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//获取但不移除此队列的头;如果此队列为空,则返回 null。 
public E peek() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return (count == 0) ? null : items[takeIndex]; 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 返回此队列中元素的数量。 
*/ 
public int size() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return count; 
} finally { 
lock.unlock(); 
} 
} 
/** 
*返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。 
*/ 
public int remainingCapacity() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return items.length - count; 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 从此队列中移除指定元素的单个实例(如果存在)。 
*/ 
public boolean remove(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
for (;;) { 
if (k++ >= count) 
return false; 
if (o.equals(items[i])) { 
removeAt(i); 
return true; 
} 
i = inc(i); 
} 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 如果此队列包含指定的元素,则返回 true。 
*/ 
public boolean contains(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
while (k++ < count) { 
if (o.equals(items[i])) 
return true; 
i = inc(i); 
} 
return false; 
} finally { 
lock.unlock(); 
} 
} 
…… 
}

Java 컬렉션 프레임워크 arrayblockingqueue 애플리케이션 분석과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트를 주목하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.