ホームページ  >  記事  >  Java  >  Java コレクション フレームワーク arrayblockingqueue アプリケーション分析

Java コレクション フレームワーク arrayblockingqueue アプリケーション分析

高洛峰
高洛峰オリジナル
2017-01-23 10:30:511481ブラウズ

Queue
---------------------
1.ArrayDeque、(配列両端キュー)
2.PriorityQueue、(優先キュー)
3.ConcurrentLinkedQueue、 (リンク リストに基づく同時キュー)
4 .DelayQueue、(遅延ブロッキング キュー) (ブロッキング キューは BlockingQueue インターフェイスを実装します)
5.ArrayBlockingQueue、(配列ベースの同時ブロッキング キュー)
6.LinkedBlockingQueue、(リンク リスト ベースの FIFO)ブロッキング キュー)
7.LinkedBlockingDeque、(リンク リストに基づく) FIFO 両端ブロッキング キュー)
8.PriorityBlockingQueue、(優先度のある無制限のブロッキング キュー)
9.SynchronousQueue (同時同期ブロッキング キュー)
------ ----------- ----------------------------------
ArrayBlockingQueue
は配列によってバックアップされた境界付きブロッキング キュー。このキューは、要素を FIFO (先入れ先出し) ベースで並べ替えます。キューの先頭は、キュー内に最も長く存在する要素です。キューの末尾は、キュー内に存在している時間が最も短い要素です。新しい要素はキューの末尾に挿入され、キューの取得操作はキューの先頭から開始されます。
これは典型的な「境界バッファ」であり、プロデューサーによって挿入された要素とコンシューマーによって抽出された要素を保持する固定サイズの配列です。このようなバッファが作成されると、その容量を増やすことはできません。満杯のキューに要素を入れようとすると操作がブロックされ、空のキューから要素を抽出しようとすると同様のブロックが発生します。
このクラスは、待機中のプロデューサー スレッドとコンシューマー スレッドを順序付けるためのオプションの公平性戦略をサポートします。デフォルトでは、この順序は保証されません。ただし、公平性を true に設定して構築されたキューでは、FIFO 順序でスレッドにアクセスできます。公平性は一般にスループットを低下させますが、変動性も低下させ、「不均衡」を回避します。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
/** 队列元素 数组 */ 
private final E[] items; 
/** 获取、删除元素时的索引(take, poll 或 remove操作) */ 
private int takeIndex; 
/** 添加元素时的索引(put, offer或 add操作) */ 
private int putIndex; 
/** 队列元素的数目*/ 
private int count; 
/** 锁 */ 
private final ReentrantLock lock; 
/** 获取操作时的条件 */ 
private final Condition notEmpty; 
/** 插入操作时的条件 */ 
private final Condition notFull; 
//超出数组长度时,重设为0 
final int inc(int i) { 
return (++i == items.length)? 0 : i; 
} 
/** 
* 插入元素(在获得锁的情况下才调用) 
*/ 
private void insert(E x) { 
items[putIndex] = x; 
putIndex = inc(putIndex); 
++count; 
notEmpty.signal(); 
} 
/** 
* 获取并移除元素(在获得锁的情况下才调用) 
*/ 
private E extract() { 
final E[] items = this.items; 
E x = items[takeIndex]; 
items[takeIndex] = null; 
takeIndex = inc(takeIndex);//移到下一个位置 
--count; 
notFull.signal(); 
return x; 
} 
/** 
* 删除i位置的元素 
*/ 
void removeAt(int i) { 
final E[] items = this.items; 
// if removing front item, just advance 
if (i == takeIndex) { 
items[takeIndex] = null; 
takeIndex = inc(takeIndex); 
} else { 
// 把i后面的直到putIndex的元素都向前移动一个位置 
for (;;) { 
int nexti = inc(i); 
if (nexti != putIndex) { 
items[i] = items[nexti]; 
i = nexti; 
} else { 
items[i] = null; 
putIndex = i; 
break; 
} 
} 
} 
--count; 
notFull.signal(); 
} 
/** 
*构造方法,指定容量,默认策略(不是按照FIFO的顺序访问) 
*/ 
public ArrayBlockingQueue(int capacity) { 
this(capacity, false); 
} 
/** 
*构造方法,指定容量及策略 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair) { 
if (capacity <= 0) 
throw new IllegalArgumentException(); 
this.items = (E[]) new Object[capacity]; 
lock = new ReentrantLock(fair); 
notEmpty = lock.newCondition(); 
notFull = lock.newCondition(); 
} 
/** 
* 通过集合构造 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair, 
Collection<? extends E> c) { 
this(capacity, fair); 
if (capacity < c.size()) 
throw new IllegalArgumentException(); 
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 
add(it.next()); 
} 
/** 
* 插入元素到队尾(super调用offer方法) 
* public boolean add(E e) { 
* if (offer(e)) 
* return true; 
* else 
* throw new IllegalStateException("Queue full"); 
* } 
* 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量), 
* 在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。 
*/ 
public boolean add(E e) { 
return super.add(e); 
} 
/** 
* 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量), 
* 在成功时返回 true,如果此队列已满,则返回 false。 
*/ 
public boolean offer(E e) { 
if (e == null) throw new NullPointerException(); 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == items.length) 
return false; 
else { 
insert(e); 
return true; 
} 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。 
*/ 
public void put(E e) throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == items.length) 
notFull.await(); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
insert(e); 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。 
*/ 
public boolean offer(E e, long timeout, TimeUnit unit) 
throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != items.length) { 
insert(e); 
return true; 
} 
if (nanos <= 0)//如果时间到了就返回 
return false; 
try { 
nanos = notFull.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头,如果此队列为空,则返回 null。 
public E poll() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == 0) 
return null; 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 
public E take() throws InterruptedException { 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == 0) 
notEmpty.await(); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 
} 
} 
//获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 
public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != 0) { 
E x = extract(); 
return x; 
} 
if (nanos <= 0) 
return null; 
try { 
nanos = notEmpty.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 
} 
} 
} finally { 
lock.unlock(); 
} 
} 
//获取但不移除此队列的头;如果此队列为空,则返回 null。 
public E peek() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return (count == 0) ? null : items[takeIndex]; 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 返回此队列中元素的数量。 
*/ 
public int size() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return count; 
} finally { 
lock.unlock(); 
} 
} 
/** 
*返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。 
*/ 
public int remainingCapacity() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return items.length - count; 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 从此队列中移除指定元素的单个实例(如果存在)。 
*/ 
public boolean remove(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
for (;;) { 
if (k++ >= count) 
return false; 
if (o.equals(items[i])) { 
removeAt(i); 
return true; 
} 
i = inc(i); 
} 
} finally { 
lock.unlock(); 
} 
} 
/** 
* 如果此队列包含指定的元素,则返回 true。 
*/ 
public boolean contains(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
while (k++ < count) { 
if (o.equals(items[i])) 
return true; 
i = inc(i); 
} 
return false; 
} finally { 
lock.unlock(); 
} 
} 
…… 
}

Java コレクション フレームワーク arrayblockingqueue アプリケーション分析に関連するその他の記事については、PHP 中国語 Web サイトに注目してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。