Home  >  Article  >  Java  >  How to use Java handwriting blocking queue

How to use Java handwriting blocking queue

WBOY
WBOYforward
2023-05-20 09:28:20986browse

    Requirement Analysis

    The main requirements of the blocking queue are as follows:

    • Basic functions of the queue It is necessary to put data into the queue and retrieve data from the queue.

    • All queue operations must be concurrency safe.

    • When the queue is full and data is put into the queue, the thread needs to be suspended. When the data in the queue is taken out to allow space in the queue, the thread needs to be suspended. wake.

    • When the queue is empty and the data is fetched from the queue, the thread needs to be suspended. When a thread adds data to the queue, the suspended thread needs to be awakened. .

    • In the queue we implement, we use arrays to store data, so we need to provide the initial size of the array in the constructor and set how big the array is.

    Blocking queue implementation principle

    Thread blocking and wake-up

    We have already talked about the blocking queue as concurrency safety , and we also need to wake up and block the thread, so we can choose a reentrant lockReentrantLock to ensure concurrency safety, but we also need to wake up and block the thread, so we can choose a condition variableCondition Perform thread wake-up and blocking operations. In Condition we will use the following two functions:

    • signal is used to wake up a thread. When a thread calls the signal function of Condition, it can wake up a thread blocked by the await function.

    • await is used to block threads. When a thread calls the await function of Condition, the thread will block.

    Array loop usage

    Because the queue enters from one end and exits from the other, the queue must have a head and a tail.

    How to use Java handwriting blocking queue

    After we add some data to the queue, the queue situation may be as follows:

    How to use Java handwriting blocking queue

    In the picture above Based on this, we are performing four dequeue operations, and the results are as follows:

    How to use Java handwriting blocking queue

    In the above state, we continue to add 8 data, then the layout is as follows:

    How to use Java handwriting blocking queue

    We know that when adding data in the above figure, not only the space in the second half of the array is used up, but also the unused space in the first half can be continued to be used, that is to say, in the queue A recycling process is implemented internally.

    In order to ensure the cyclic use of the array, we need to use a variable to record the position of the queue head in the array, a variable to record the position of the queue tail in the array, and a variable to record the number of items in the queue. data.

    Code implementation

    Member variable definition

    According to the above analysis, we can know that we need the following class member variables in the classes we implement:

    // 用于保护临界区的锁
    private final ReentrantLock lock;
    // 用于唤醒取数据的时候被阻塞的线程
    private final Condition notEmpty;
    // 用于唤醒放数据的时候被阻塞的线程
    private final Condition notFull;
    // 用于记录从数组当中取数据的位置 也就是队列头部的位置
    private int takeIndex;
    // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
    private int putIndex;
    // 记录队列当中有多少个数据
    private int count;
    // 用于存放具体数据的数组
    private Object[] items;

    Constructor

    Our constructor is also very simple. The core thing is to pass in an array-sized parameter and initialize and assign values ​​to the above variables.

    @SuppressWarnings("unchecked")
    public MyArrayBlockingQueue(int size) {
      this.lock = new ReentrantLock();
      this.notEmpty = lock.newCondition();
      this.notFull = lock.newCondition();
      // 其实可以不用初始化 类会有默认初始化 默认初始化为0
      takeIndex = 0;
      putIndex = 0;
      count = 0;
      // 数组的长度肯定不能够小于0
      if (size <= 0)
        throw new RuntimeException("size can not be less than 1");
      items = (E[])new Object[size];
    }

    put function

    This is a more important function. In this function, if the queue is not full, the data can be directly put into the array. If the array is full, You need to suspend the thread.

    public void put(E x){
      // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程
      // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作
      // 因此这里需要上锁
      lock.lock();
     
      try {
        // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了
        // 这个时候需要将线程挂起
        while (count == items.length)
          notFull.await(); // 将调用 await的线程挂起
        // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了
        // 这个时候需要将数组入队 
        // 调用入队函数将数据入队
        enqueue(x);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 解锁
        lock.unlock();
      }
    }
     
    // 将数据入队
    private void enqueue(E x) {
      this.items[putIndex] = x;
      if (++putIndex == items.length)
        putIndex = 0;
      count++;
      notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒
    }

    offer function

    The offer function is the same as the put function, but the difference from the put function is that when the data in the array is filled, the offer function returns false instead of blocked.

    public boolean offer(E e) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
        // 如果数组满了 则直接返回false 而不是被阻塞
        if (count == items.length)
          return false;
        else {
          // 如果数组没有满则直接入队 并且返回 true
          enqueue(e);
          return true;
        }
      } finally {
        lock.unlock();
      }
    }

    add function

    This function has the same function as the above two functions. It also adds data to the queue, but when the single queue is full, this function will throw an exception.

    public boolean add(E e) {
      if (offer(e))
        return true;
      else
        throw new RuntimeException("Queue full");
    }

    take function

    This function mainly takes out a piece of data from the queue, but when the queue is empty, this function will block the thread that calls the function:

    public E take() throws InterruptedException {
      // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据
      // 进行加锁操作
      lock.lock();
      try {
        // 当 count 等于0 说明队列为空
        // 需要将线程挂起等待
        while (count == 0)
          notEmpty.await();
        // 当被唤醒之后进行出队操作
        return dequeue();
      }finally {
        lock.unlock();
      }
    }
     
    private E  dequeue() {
      final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      E x = (E) items[takeIndex];
      items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了
      if (++takeIndex == items.length)
        takeIndex = 0;
      count--; // 队列当中数据少一个了
      // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程
      // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒
      notFull.signal(); // 唤醒一个被 put 函数阻塞的线程
      return x;
    }

    Rewrite toString function

    Because we will print our class in the subsequent test function, and when printing this class, we will call the toString method of the object to get a string, and finally print it this string.

    @Override
    public String toString() {
      StringBuilder stringBuilder = new StringBuilder();
      stringBuilder.append("[");
      // 这里需要上锁 因为我们在打印的时候需要打印所有的数据
      // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历
      // 操作的时候是不能进行插入和删除操作的 因为打印的是某
      // 个时刻的数据
      lock.lock();
      try {
        if (count == 0)
          stringBuilder.append("]");
        else {
          int cur = 0;
          // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count
          // 个数据
          while (cur != count) {
            // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的
            stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
            cur += 1;
          }
          // 删除掉最后一次没用的 ", "
          stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
          stringBuilder.append(&#39;]&#39;);
        }
      }finally {
        lock.unlock();
      }
      return stringBuilder.toString();
    }

    Complete code

    The entire blocking queue code we completed is as follows:

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class MyArrayBlockingQueue<E> {
     
      // 用于保护临界区的锁
      private final ReentrantLock lock;
      // 用于唤醒取数据的时候被阻塞的线程
      private final Condition notEmpty;
      // 用于唤醒放数据的时候被阻塞的线程
      private final Condition notFull;
      // 用于记录从数组当中取数据的位置 也就是队列头部的位置
      private int takeIndex;
      // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
      private int putIndex;
      // 记录队列当中有多少个数据
      private int count;
      // 用于存放具体数据的数组
      private Object[] items;
     
     
      @SuppressWarnings("unchecked")
      public MyArrayBlockingQueue(int size) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
        // 其实可以不用初始化 类会有默认初始化 默认初始化为0
        takeIndex = 0;
        putIndex = 0;
        count = 0;
        if (size <= 0)
          throw new RuntimeException("size can not be less than 1");
        items = (E[])new Object[size];
      }
     
      public void put(E x){
        lock.lock();
     
        try {
          while (count == items.length)
            notFull.await();
          enqueue(x);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
     
      private void enqueue(E x) {
        this.items[putIndex] = x;
        if (++putIndex == items.length)
          putIndex = 0;
        count++;
        notEmpty.signal();
      }
     
      private E  dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
          takeIndex = 0;
        count--;
        notFull.signal();
        return x;
      }
     
      public boolean add(E e) {
        if (offer(e))
          return true;
        else
          throw new RuntimeException("Queue full");
      }
     
      public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          if (count == items.length)
            return false;
          else {
            enqueue(e);
            return true;
          }
        } finally {
          lock.unlock();
        }
      }
     
      public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          return (count == 0) ? null : dequeue();
        } finally {
          lock.unlock();
        }
      }
     
      public E take() throws InterruptedException {
        lock.lock();
        try {
          while (count == 0)
            notEmpty.await();
          return dequeue();
        }finally {
          lock.unlock();
        }
      }
     
      @Override
      public String toString() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("[");
        lock.lock();
        try {
          if (count == 0)
            stringBuilder.append("]");
          else {
            int cur = 0;
            while (cur != count) {
              stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
              cur += 1;
            }
            stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
            stringBuilder.append(&#39;]&#39;);
          }
        }finally {
          lock.unlock();
        }
        return stringBuilder.toString();
      }
     
    }

    Now test the above code:

    We now use The blocking queue simulates a producer-consumer model. Set the size of the blocking queue to 5. The producer thread will add data to the queue. The data is 10 numbers from 0 to 9. The consumer thread will consume 10 times in total.

    import java.util.concurrent.TimeUnit;
     
    public class Test {
     
      public static void main(String[] args) throws InterruptedException {
        MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
        Thread thread = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i);
            queue.put(i);
          }
        }, "生产者");
     
     
        Thread thread1 = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            try {
              System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take());
              System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }, "消费者");
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        thread1.start();
     
      }
    }

    The output of the above code is as follows:

    The producer adds data to the queue: 0
    The producer adds data to the queue: 1
    The producer adds data to the queue: 2
    The producer adds data to the queue: 3
    The producer adds data to the queue: 4
    The producer adds data to the queue: 5
    The consumer takes out data from the queue: 0
    The producer adds data to the queue: 6
    The data in the consumer's current queue: [1, 2, 3, 4, 5]
    The consumer takes out the data from the queue: 1
    The data in the consumer's current queue: [2, 3, 4 , 5]
    The consumer takes out data from the queue: 2
    The data in the consumer's current queue: [3, 4, 5, 6]
    The producer adds data to the queue: 7
    The consumer takes out data from the queue: 3
    The data in the consumer's current queue: [4, 5, 6, 7]
    The consumer takes out the data from the queue: 4
    The consumer's current queue Data: [5, 6, 7]
    The consumer takes out data from the queue: 5
    The data in the consumer’s current queue: [6, 7]
    The producer adds data to the queue: 8
    The consumer takes out data from the queue: 6
    The data in the consumer's current queue: [7, 8]
    The consumer takes out the data from the queue: 7
    The data in the consumer's current queue: [8]
    The consumer takes out data from the queue: 8
    The data in the current queue of the consumer: []
    The producer adds data to the queue: 9
    The consumer takes out the data from the queue :9
    Data in the current queue of the consumer: []

    From the above output results, we know that the producer thread was suspended after printing 5, because if it was not suspended, The producer thread can definitely complete the output in one go, because the consumer thread is blocked for 3 seconds. Because the blocking queue is full, he did not complete the output after printing the number 5, causing the producer thread to be suspended. Once the consumer starts consuming, space is made available in the blocking queue and the producer thread can continue producing.

    The above is the detailed content of How to use Java handwriting blocking queue. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete