ホームページ >Java >&#&チュートリアル >同時パケット ブロック キューの ArrayBlockingQueue
前のセクションでは、ノンブロッキング キュー
ConcurrentLinkedQueue のエンキューとデキューを説明しました。同時実行パッケージが実行されました 簡単な分析、この記事では、同時実行パッケージのブロッキング キューについて簡単に分析します。 Java
同時実行パッケージには合計7のブロックキューがあり、もちろんそれらはすべてスレッドセーフです。
: 配列構造で構成される境界付きブロッキングキュー。
LinkedBlockingQueuePriorityBlockingQueue
DealyQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingDeque
The Art of Concurrent Programming』より抜粋)
この記事では、ArrayBlockingQueueブロッキングキューについて簡単に分析します
ArrayLinkedQueueについては、過去を振り返るとReentrantLockによって安全性が保証されていますReentrantLockの解析については『5.Lockインターフェースとその実装』を参照してください。リエントラントロック
」でも以下に適宜言及しておきます。 まずコンストラクターを見てみましょう: 構築メソッド パブリック ArrayBlockingQueue(int容量) 指定されたサイズの有界キューを構築します public ArrayBlockingQueue(intcapacity, booleanfair) の有界キューを構築します指定されたサイズ、公平または不公平なロックとして指定します public ArrayBlockingQueue(intcapacity booleanfair 、コレクション 指定されたサイズの有界キューを構築、公平または不公平なロックとして指定、追加するように指定コレクション 在这里有几个ArrayBlockingQueue成员变量。items即队列的数组引用,putIndex表示等待插入的数组下标位置。当items[putIndex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putIndex置为0: 接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。 抛出异常 返回值(非阻塞) 一定时间内返回值 返回值(阻塞) remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。 最后一个方法size。 可以看到ArrayBlockingQueue队列的size方法,是直接返回的count变量,它不像ConcurrentLinkedQueue,ConcurrentLinkedQueue的size则是每次会遍历这个队列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法效率高。而且ConcurrentLinkedQueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了ConcurrentLinkedQueue的size并没有多大的用处。
1 public ArrayBlockingQueue(int capacity) {
2 this(capacity, false);//默认构造非公平锁的阻塞队列 3 }
4 public ArrayBlockingQueue(int capacity, boolean fair) {
5 if (capacity <= 0)
6 throw new IllegalArgumentException();
7 this.items = new Object[capacity];
8 lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁 9 notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》10 notFull = lock.newCondition;//初始化非满等待队列 11 }
12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) {
13 this(capacity, fair);
14 final ReentrantLock lock = this.lock;
15 lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16 try {
17 int i = 0;
18 try {
19 for (E e : c) {
20 checkNotNull(e);
21 item[i++] = e;//将集合添加进数组构成的队列中 22 }
23 } catch (ArrayIndexOutOfBoundsException ex) {
24 throw new IllegalArgumentException();
25 }
26 count = i;//队列中的实际数据数量 27 putIndex = (i == capacity) ? 0 : i;
28 } finally {
29 lock.unlock();
30 }
31 }</span></span></span></span></span></span></span></span></span></p></div>
<div class="OutlineElement Ltr SCX196234819">
<p class="Paragraph SCX196234819"><span class="TextRun SCX196234819"> 15 行目<span class="TextRun SCX196234819"><span class="TextRun SCX196234819"> には、ソース コードに次のコメントがあります: <span class="TextRun SCX196234819"> 相互排他ではなく、可視性のみをロックします<span class="TextRun SCX196234819">。この文の意味は、このロックの操作が相互排除操作ではなく、その可視性を確保するためであることを示しています。 Thread<span class="TextRun SCX196234819">t1<span class="TextRun SCX196234819">はインスタンス化されたArrayblockingQueue<span class="TextRun SCX196234819">オブジェクトであり、<span class="TextRun SCX196234819">t2<span class="SpellingError SCX196234819">はインスタンス化された<span class="TextRun SCX196234819"><span class="TextRun SCX196234819">rayblockingQueue<span class="TextRun SCX196234819">オブジェクトのエンキュー操作です(もちろん、<span class="TextRun SCX196234819">t1<span class="SpellingError SCX196234819">および<span class="TextRun SCX196234819">t2<span class="TextRun SCX196234819">の実行順序は保証する必要があります)。実行されません。 ロック操作 (ロックにより可視性が保証されます。つまり、メインメモリに書き戻されます)。<span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819"> のコレクション <span class="TextRun SCX196234819">c<span class="TextRun SCX196234819"> は、<span class="TextRun SCX196234819">T1<span class="TextRun SCX196234819"> スレッドによって維持されるキャッシュ内にのみ存在でき、書き戻されません。 <span class="TextRun SCX196234819"> でインスタンス化された <span class="TextRun SCX196234819"><span class="TextRun SCX196234819">ArrayBlockingQueue<span class="TextRun SCX196234819"> によって維持されるキャッシュとメイン メモリにはコレクション <span class="TextRun SCX196234819">c<span class="TextRun SCX196234819"> がありません。このとき、可視性によりデータの不整合が発生し、スレッド セーフティの問題 <span class="SpellingError SCX196234819"> が発生します。 <span class="TextRun SCX196234819"><span class="TextRun SCX196234819"><span class="TextRun SCX196234819"><span class="TextRun SCX196234819"><span class="EOP SCX196234819"></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span> </span></span></span></span></span></span></span></span></span></span></p> 以下は </div> の出力の一部です<div class="OutlineElement Ltr SCX196234819">ArrayBlockingQueue<p class="Paragraph SCX196234819"> チームエントリー操作。 <span class="TextRun SCX196234819"><span class="TextRun SCX196234819"><span class="SpellingError SCX196234819"><span class="TextRun SCX196234819"></span></span></span></span></p>
<h2 class="Paragraph SCX196234819"><strong><span class="TextRun SCX196234819"><span class="TextRun SCX196234819">キュー<span class="SpellingError SCX196234819"><span class="TextRun SCX196234819">要素の挿入<span class="TextRun SCX37794459"><span class="NormalTextRun SCX37794459"><span class="TextRun SCX37794459"><span class="NormalTextRun SCX37794459"><span class="TextRun SCX37794459"><span class="NormalTextRun SCX37794459"></span></span></span></span></span></span></span> </span></span></span></strong></h2>
<table class="Table TableWordWrap SCX104568718" border="1">
<tbody class="SCX104568718">
<tr class="TableRow SCX104568718">
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718">
<div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div> </div></td>
<td class="SCX104568718">例外をスローします<div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div>
</td>
<td class="SCX104568718">戻り値(非ブロッキング)<div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div>
</td>
<td class="SCX104568718">ある時刻の戻り値<div class="TableCellContent TableCellSelected SCX104568718">
<div class="OutlineElement Ltr SCX104568718">
<p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p> </div> </div>
</td>
<td class="SCX104568718">戻り値(ブロック)<div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div>
</td>
</tr>
<tr class="TableRow SCX104568718">
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718">Insert<span class="EOP SCX104568718"></span></span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718">add(e)//キューがいっぱいでない場合はtrueを返し、キューがいっぱいの場合は<span class="TextRun SCX104568718">IllegalStateExceptionをスローします。 <span class="TextRun SCX104568718">(「キュー」 full" )Exception ——<span class="TextRun SCX104568718"><span class="SpellingError SCX104568718">AbstractQueue<span class="EOP SCX104568718"></span></span></span></span></span></span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718"><span class="TextRun SCX104568718">offer(e)//キューがいっぱいでない場合はtrueを返し、キューがいっぱいの場合はfalseを返します。ノンブロッキングはすぐに戻ります。 <span class="EOP SCX104568718"> </span></span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718">offer(e, time,unit)// 指定された時間内にデータをキューに挿入できない場合は、 false を返します。成功すると true が返されます。 <span class="TextRun SCX104568718"> <span class="EOP SCX104568718"></span></span></p></div></div></td>
<td class="SCX104568718"><div class="TableCellContent TableCellSelected SCX104568718"><div class="OutlineElement Ltr SCX104568718"><p class="Paragraph SCX104568718">put(e)//キューがいっぱいでない場合、直接挿入の戻り値はなく、キューがいっぱいになるまでブロックして待機します。再度挿入する前にいっぱいにしてください。 <span class="TextRun SCX104568718"><span class="EOP SCX104568718"></span></span></p></div></div></td>
</tr>
</tbody>
<div class="cnblogs_code"><pre class="brush:php;toolbar:false">//ArrayBlockingQueue#add public boolean add(E e) {
return super.add(e);
}
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) {
if (offer(e))//offer方法由Queue接口定义 return true;
else throw new IllegalStateException();
}
//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false public boolean offer(E e) {
checkNotNull(e);//检查入队元素是否为空 final ReentrantLock lock = this.lock;
lock.lock();//获得锁,线程安全 try {
if (count == items.length)//队列满时,不阻塞等待,直接返回false return false;
else {
insert(e);//队列未满,直接插入 return true;
}
} finally {
lock.unlock();
}
}
//ArrayBlockingQueue#insert private void insert(E e) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》
}
//ArrayBlockingQueue#inc private int inc(int i) {
return (++i == items.length) ? 0 : i;
}
//ArrayBlockingQueue#put public void put(E e) throws InterruptedException {
checkNotNull(e);//同样检查插入元素是否为空 final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try {
while (count == items.length)
notFull.await();//当队列满时,使非满等待队列休眠 insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列 } finally {
lock.unlock();
}
}
队列元素的删除
//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public E remove() {
E x = poll();//poll方法由Queue接口定义 if (x != null)
return x;
else throw new NoSuchElementException();
}
//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
//ArrayBlockingQueue#extract private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);//移除队首元素 items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收 takeIndex = inc(takeIndex);
--count;
notFull.signal();//唤醒非满等待队列线程 return x;
}
//ArrayBlockQueue#take public E take() throws InterruptedException {
final ReentrantLock lock = this.lock();
lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try {
while (count == 0)//队列元素为空 notEmpty.await();//非空等待队列休眠 return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列 } finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
以上が同時パケット ブロック キューの ArrayBlockingQueueの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。