ブロッキング キューの主な要件は次のとおりです。
基本キューの機能 キューにデータを入れたり、キューからデータを取り出したりする必要があります。
すべてのキュー操作は 同時実行安全である必要があります。
キューがいっぱいでデータがキューに入れられると、スレッドを一時停止する必要があります。キュー内のデータを取り出してキューにスペースを確保すると、スレッドは一時停止する必要があります。目覚めます。
キューが空で、データがキューからフェッチされる場合、スレッドは一時停止される必要があります。スレッドがキューにデータを追加する場合、一時停止されたスレッドは起動される必要があります。 。
実装するキューでは、配列を使用してデータを保存するため、コンストラクターで配列の初期サイズを指定し、配列の大きさを設定する必要があります。
ブロッキング キューについては、同時実行の安全性##としてすでに説明しました。 # 、スレッドを起動してブロックする必要があるため、同時実行の安全性を確保するためにリエントラント ロック ReentrantLock を選択できますが、スレッドを起動してブロックする必要もあるので、条件変数
Condition スレッドのウェイクアップとブロック操作を実行します。
Condition では、次の 2 つの関数を使用します:
signal はスレッドをウェイクアップするために使用されます。スレッドが
Condition の
signal 関数を呼び出すと、
await 関数によってブロックされたスレッドをウェイクアップできます。 。
await は、スレッドをブロックするために使用されます。スレッドが
Condition の
await 関数を呼び出すと、スレッドはブロック。
// 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items;Constructorコンストラクターも非常にシンプルで、中心となるのは、配列サイズのパラメーターを渡し、初期化して上記の変数に値を代入することです。
@SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; // 数组的长度肯定不能够小于0 if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; }put functionこれはより重要な関数です。この関数では、キューがいっぱいでない場合、データを配列に直接入れることができます。配列がいっぱいの場合は、スレッドを一時停止する必要があります。
public void put(E x){ // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程 // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作 // 因此这里需要上锁 lock.lock(); try { // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了 // 这个时候需要将线程挂起 while (count == items.length) notFull.await(); // 将调用 await的线程挂起 // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了 // 这个时候需要将数组入队 // 调用入队函数将数据入队 enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 解锁 lock.unlock(); } } // 将数据入队 private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒 }offer functionoffer 関数は put 関数と同じですが、put 関数との違いは、配列内のデータが満たされると、offer 関数は ## を返すことです。ブロックされる代わりに #false
。 <pre class="brush:java;">public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果数组满了 则直接返回false 而不是被阻塞
if (count == items.length)
return false;
else {
// 如果数组没有满则直接入队 并且返回 true
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}</pre>
add function
public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); }
take function
public E take() throws InterruptedException { // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据 // 进行加锁操作 lock.lock(); try { // 当 count 等于0 说明队列为空 // 需要将线程挂起等待 while (count == 0) notEmpty.await(); // 当被唤醒之后进行出队操作 return dequeue(); }finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了 if (++takeIndex == items.length) takeIndex = 0; count--; // 队列当中数据少一个了 // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程 // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒 notFull.signal(); // 唤醒一个被 put 函数阻塞的线程 return x; }
toString 関数を書き換えます
メソッドを呼び出して文字列を取得するため、そして最後にこの文字列を出力します。 <pre class="brush:java;">@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("[");
// 这里需要上锁 因为我们在打印的时候需要打印所有的数据
// 打印所有的数据就需要对数组进行遍历操作 而在进行遍历
// 操作的时候是不能进行插入和删除操作的 因为打印的是某
// 个时刻的数据
lock.lock();
try {
if (count == 0)
stringBuilder.append("]");
else {
int cur = 0;
// 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count
// 个数据
while (cur != count) {
// 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的
stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
cur += 1;
}
// 删除掉最后一次没用的 ", "
stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
stringBuilder.append(&#39;]&#39;);
}
}finally {
lock.unlock();
}
return stringBuilder.toString();
}</pre>
完全なコード
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyArrayBlockingQueue<E> { // 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items; @SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; } public void put(E x){ lock.lock(); try { while (count == items.length) notFull.await(); enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; while (cur != count) { stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", "); cur += 1; } stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); } }
次に、上記のコードをテストします:
今、ブロッキングを使用します。キューはプロデューサー/コンシューマー モデルをシミュレートします。ブロッキング キューのサイズを 5 に設定します。プロデューサー スレッドはデータをキューに追加します。データは 0 から 9 までの 10 個の数値です。コンシューマー スレッドは合計 10 回消費します。
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5); Thread thread = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i); queue.put(i); } }, "生产者"); Thread thread1 = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take()); System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue); } catch (InterruptedException e) { e.printStackTrace(); } } }, "消费者"); thread.start(); TimeUnit.SECONDS.sleep(3); thread1.start(); } }
上記のコードの出力は次のとおりです:
プロデューサーはキューにデータを追加します: 0
プロデューサーはキューにデータを追加します: 1
プロデューサーはキューにデータを追加します: 2
プロデューサーはキューにデータを追加します: 3
プロデューサはキューにデータを追加します: 4
プロデューサはキューにデータを追加します: 5
コンシューマはキューからデータを取り出します: 0
プロデューサはキューにデータを追加します: 6
コンシューマの現在のキュー内のデータ: [1, 2, 3, 4, 5]
コンシューマはキューからデータを取り出します: 1
コンシューマの現在のキュー内のデータ: [2, 3, 4 , 5]
コンシューマはキューからデータを取り出します: 2
コンシューマの現在のキュー内のデータ: [3, 4, 5, 6]
プロデューサはキューにデータを追加します: 7
コンシューマーはキューからデータを取り出します: 3
コンシューマーの現在のキュー内のデータ: [4, 5, 6, 7]
コンシューマーはキューからデータを取り出します: 4
コンシューマの現在のキュー データ: [5, 6, 7]
コンシューマはキューからデータを取り出します: 5
コンシューマの現在のキュー内のデータ: [6, 7]
プロデューサはデータを追加しますキューへ: 8
コンシューマはキューからデータを取り出します: 6
コンシューマの現在のキュー内のデータ: [7, 8]
コンシューマはキューからデータを取り出します: 7
コンシューマの現在のキューのデータ: [8]
コンシューマはキューからデータを取り出します: 8
コンシューマの現在のキューのデータ: []
プロデューサはキューにデータを追加します: 9
コンシューマはキューからデータを取り出します。 :9
コンシューマの現在のキュー内のデータ: []
上記の出力結果から、プロデューサがスレッドは 5 の出力後に一時停止されました。一時停止されていない場合、コンシューマ スレッドは 3 秒間ブロックされるため、プロデューサ スレッドは確実に 1 回で出力を完了できます。ブロッキング キューがいっぱいであるため、数値 5 を出力した後も出力を完了できず、プロデューサー スレッドが中断されました。コンシューマーが消費を開始すると、ブロッキング キューにスペースが空き、プロデューサー スレッドは生産を続行できます。
以上がJava手書きブロックキューの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。