ホームページ  >  記事  >  Java  >  Java手書きブロックキューの使用方法

Java手書きブロックキューの使用方法

WBOY
WBOY転載
2023-05-20 09:28:201037ブラウズ

    要件分析

    ブロッキング キューの主な要件は次のとおりです。

    • 基本キューの機能 キューにデータを入れたり、キューからデータを取り出したりする必要があります。

    • すべてのキュー操作は 同時実行安全である必要があります。

    • キューがいっぱいでデータがキューに入れられると、スレッドを一時停止する必要があります。キュー内のデータを取り出してキューにスペースを確保すると、スレッドは一時停止する必要があります。目覚めます。

    • キューが空で、データがキューからフェッチされる場合、スレッドは一時停止される必要があります。スレッドがキューにデータを追加する場合、一時停止されたスレッドは起動される必要があります。 。

    • 実装するキューでは、配列を使用してデータを保存するため、コンストラクターで配列の初期サイズを指定し、配列の大きさを設定する必要があります。

    ブロッキング キューの実装原則

    スレッド ブロッキングとウェイクアップ

    ブロッキング キューについては、同時実行の安全性##としてすでに説明しました。 # 、スレッドを起動してブロックする必要があるため、同時実行の安全性を確保するためにリエントラント ロック ReentrantLock を選択できますが、スレッドを起動してブロックする必要もあるので、条件変数Condition スレッドのウェイクアップとブロック操作を実行します。Condition では、次の 2 つの関数を使用します:

    • signal はスレッドをウェイクアップするために使用されます。スレッドが Conditionsignal 関数を呼び出すと、await 関数によってブロックされたスレッドをウェイクアップできます。 。

    • await は、スレッドをブロックするために使用されます。スレッドが Conditionawait 関数を呼び出すと、スレッドはブロック。

    配列ループの使用法

    キューは一方の端から入り、もう一方の端から出ていくため、キューには先頭と末尾が必要です。

    Java手書きブロックキューの使用方法

    データをキューに追加すると、キューの状況は次のようになります:

    Java手書きブロックキューの使用方法

    図上記に基づいて、4 つのデキュー操作を実行し、結果は次のようになります。

    Java手書きブロックキューの使用方法

    上記の状態で、8 つのデータを追加し続けると、レイアウトは次のようになります。

    Java手書きブロックキューの使用方法

    上の図でデータを追加すると、配列の後半のスペースが使用されるだけでなく、配列の未使用のスペースも使用されることがわかります。前半は引き続き使用できます。つまり、キュー内にあります。リサイクル プロセスが内部で実装されています。

    配列を循環的に使用できるようにするには、変数を使用して配列内のキューの先頭の位置を記録し、変数を使用して配列内のキューの末尾の位置を記録する必要があります。キュー内の項目の数を記録する変数 data。

    コードの実装

    メンバー変数の定義

    上記の分析によれば、実装するクラスには次のクラス メンバー変数が必要であることがわかります:

    // 用于保护临界区的锁
    private final ReentrantLock lock;
    // 用于唤醒取数据的时候被阻塞的线程
    private final Condition notEmpty;
    // 用于唤醒放数据的时候被阻塞的线程
    private final Condition notFull;
    // 用于记录从数组当中取数据的位置 也就是队列头部的位置
    private int takeIndex;
    // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
    private int putIndex;
    // 记录队列当中有多少个数据
    private int count;
    // 用于存放具体数据的数组
    private Object[] items;

    Constructor

    コンストラクターも非常にシンプルで、中心となるのは、配列サイズのパラメーターを渡し、初期化して上記の変数に値を代入することです。

    @SuppressWarnings("unchecked")
    public MyArrayBlockingQueue(int size) {
      this.lock = new ReentrantLock();
      this.notEmpty = lock.newCondition();
      this.notFull = lock.newCondition();
      // 其实可以不用初始化 类会有默认初始化 默认初始化为0
      takeIndex = 0;
      putIndex = 0;
      count = 0;
      // 数组的长度肯定不能够小于0
      if (size <= 0)
        throw new RuntimeException("size can not be less than 1");
      items = (E[])new Object[size];
    }

    put function

    これはより重要な関数です。この関数では、キューがいっぱいでない場合、データを配列に直接入れることができます。配列がいっぱいの場合は、スレッドを一時停止する必要があります。

    public void put(E x){
      // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程
      // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作
      // 因此这里需要上锁
      lock.lock();
     
      try {
        // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了
        // 这个时候需要将线程挂起
        while (count == items.length)
          notFull.await(); // 将调用 await的线程挂起
        // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了
        // 这个时候需要将数组入队 
        // 调用入队函数将数据入队
        enqueue(x);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 解锁
        lock.unlock();
      }
    }
     
    // 将数据入队
    private void enqueue(E x) {
      this.items[putIndex] = x;
      if (++putIndex == items.length)
        putIndex = 0;
      count++;
      notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒
    }

    offer function

    offer 関数は put 関数と同じですが、put 関数との違いは、配列内のデータが満たされると、offer 関数は ## を返すことです。ブロックされる代わりに #false

    <pre class="brush:java;">public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果数组满了 则直接返回false 而不是被阻塞 if (count == items.length) return false; else { // 如果数组没有满则直接入队 并且返回 true enqueue(e); return true; } } finally { lock.unlock(); } }</pre>add function

    この関数は上記 2 つの関数と同じ機能で、キューにデータを追加しますが、単一のキューがいっぱいになると、この関数は例外をスローします。

    public boolean add(E e) {
      if (offer(e))
        return true;
      else
        throw new RuntimeException("Queue full");
    }

    take function

    この関数は主にキューからデータの一部を取り出しますが、キューが空の場合、この関数は関数を呼び出すスレッドをブロックします:

    public E take() throws InterruptedException {
      // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据
      // 进行加锁操作
      lock.lock();
      try {
        // 当 count 等于0 说明队列为空
        // 需要将线程挂起等待
        while (count == 0)
          notEmpty.await();
        // 当被唤醒之后进行出队操作
        return dequeue();
      }finally {
        lock.unlock();
      }
    }
     
    private E  dequeue() {
      final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      E x = (E) items[takeIndex];
      items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了
      if (++takeIndex == items.length)
        takeIndex = 0;
      count--; // 队列当中数据少一个了
      // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程
      // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒
      notFull.signal(); // 唤醒一个被 put 函数阻塞的线程
      return x;
    }

    toString 関数を書き換えます

    後続のテスト関数でクラスを出力し、このクラスを出力するときに、オブジェクトの

    toString

    メソッドを呼び出して文字列を取得するため、そして最後にこの文字列を出力します。 <pre class="brush:java;">@Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(&quot;[&quot;); // 这里需要上锁 因为我们在打印的时候需要打印所有的数据 // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历 // 操作的时候是不能进行插入和删除操作的 因为打印的是某 // 个时刻的数据 lock.lock(); try { if (count == 0) stringBuilder.append(&quot;]&quot;); else { int cur = 0; // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count // 个数据 while (cur != count) { // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的 stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + &quot;, &quot;); cur += 1; } // 删除掉最后一次没用的 &quot;, &quot; stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(&amp;#39;]&amp;#39;); } }finally { lock.unlock(); } return stringBuilder.toString(); }</pre>完全なコード

    完成したブロッキング キュー コード全体は次のとおりです:

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class MyArrayBlockingQueue<E> {
     
      // 用于保护临界区的锁
      private final ReentrantLock lock;
      // 用于唤醒取数据的时候被阻塞的线程
      private final Condition notEmpty;
      // 用于唤醒放数据的时候被阻塞的线程
      private final Condition notFull;
      // 用于记录从数组当中取数据的位置 也就是队列头部的位置
      private int takeIndex;
      // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
      private int putIndex;
      // 记录队列当中有多少个数据
      private int count;
      // 用于存放具体数据的数组
      private Object[] items;
     
     
      @SuppressWarnings("unchecked")
      public MyArrayBlockingQueue(int size) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
        // 其实可以不用初始化 类会有默认初始化 默认初始化为0
        takeIndex = 0;
        putIndex = 0;
        count = 0;
        if (size <= 0)
          throw new RuntimeException("size can not be less than 1");
        items = (E[])new Object[size];
      }
     
      public void put(E x){
        lock.lock();
     
        try {
          while (count == items.length)
            notFull.await();
          enqueue(x);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
     
      private void enqueue(E x) {
        this.items[putIndex] = x;
        if (++putIndex == items.length)
          putIndex = 0;
        count++;
        notEmpty.signal();
      }
     
      private E  dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
          takeIndex = 0;
        count--;
        notFull.signal();
        return x;
      }
     
      public boolean add(E e) {
        if (offer(e))
          return true;
        else
          throw new RuntimeException("Queue full");
      }
     
      public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          if (count == items.length)
            return false;
          else {
            enqueue(e);
            return true;
          }
        } finally {
          lock.unlock();
        }
      }
     
      public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          return (count == 0) ? null : dequeue();
        } finally {
          lock.unlock();
        }
      }
     
      public E take() throws InterruptedException {
        lock.lock();
        try {
          while (count == 0)
            notEmpty.await();
          return dequeue();
        }finally {
          lock.unlock();
        }
      }
     
      @Override
      public String toString() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("[");
        lock.lock();
        try {
          if (count == 0)
            stringBuilder.append("]");
          else {
            int cur = 0;
            while (cur != count) {
              stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
              cur += 1;
            }
            stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
            stringBuilder.append(&#39;]&#39;);
          }
        }finally {
          lock.unlock();
        }
        return stringBuilder.toString();
      }
     
    }

    次に、上記のコードをテストします:

    今、ブロッキングを使用します。キューはプロデューサー/コンシューマー モデルをシミュレートします。ブロッキング キューのサイズを 5 に設定します。プロデューサー スレッドはデータをキューに追加します。データは 0 から 9 までの 10 個の数値です。コンシューマー スレッドは合計 10 回消費します。

    import java.util.concurrent.TimeUnit;
     
    public class Test {
     
      public static void main(String[] args) throws InterruptedException {
        MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
        Thread thread = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i);
            queue.put(i);
          }
        }, "生产者");
     
     
        Thread thread1 = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            try {
              System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take());
              System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }, "消费者");
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        thread1.start();
     
      }
    }

    上記のコードの出力は次のとおりです:

    プロデューサーはキューにデータを追加します: 0
    プロデューサーはキューにデータを追加します: 1
    プロデューサーはキューにデータを追加します: 2
    プロデューサーはキューにデータを追加します: 3
    プロデューサはキューにデータを追加します: 4
    プロデューサはキューにデータを追加します: 5
    コンシューマはキューからデータを取り出します: 0
    プロデューサはキューにデータを追加します: 6
    コンシューマの現在のキュー内のデータ: [1, 2, 3, 4, 5]
    コンシューマはキューからデータを取り出します: 1
    コンシューマの現在のキュー内のデータ: [2, 3, 4 , 5]
    コンシューマはキューからデータを取り出します: 2
    コンシューマの現在のキュー内のデータ: [3, 4, 5, 6]
    プロデューサはキューにデータを追加します: 7
    コンシューマーはキューからデータを取り出します: 3
    コンシューマーの現在のキュー内のデータ: [4, 5, 6, 7]
    コンシューマーはキューからデータを取り出します: 4
    コンシューマの現在のキュー データ: [5, 6, 7]
    コンシューマはキューからデータを取り出します: 5
    コンシューマの現在のキュー内のデータ: [6, 7]
    プロデューサはデータを追加しますキューへ: 8
    コンシューマはキューからデータを取り出します: 6
    コンシューマの現在のキュー内のデータ: [7, 8]
    コンシューマはキューからデータを取り出します: 7
    コンシューマの現在のキューのデータ: [8]
    コンシューマはキューからデータを取り出します: 8
    コンシューマの現在のキューのデータ: []
    プロデューサはキューにデータを追加します: 9
    コンシューマはキューからデータを取り出します。 :9
    コンシューマの現在のキュー内のデータ: []

    上記の出力結果から、プロデューサがスレッドは 5 の出力後に一時停止されました。一時停止されていない場合、コンシューマ スレッドは 3 秒間ブロックされるため、プロデューサ スレッドは確実に 1 回で出力を完了できます。ブロッキング キューがいっぱいであるため、数値 5 を出力した後も出力を完了できず、プロデューサー スレッドが中断されました。コンシューマーが消費を開始すると、ブロッキング キューにスペースが空き、プロデューサー スレッドは生産を続行できます。

    以上がJava手書きブロックキューの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。