Heim  >  Artikel  >  Java  >  So verwenden Sie die Java-Handschriftblockierungswarteschlange

So verwenden Sie die Java-Handschriftblockierungswarteschlange

WBOY
WBOYnach vorne
2023-05-20 09:28:20986Durchsuche

    Anforderungsanalyse

    Die Hauptanforderungen für das Blockieren von Warteschlangen sind wie folgt:

    #🎜 🎜#
    • Die Grundfunktionen der Warteschlange müssen darin bestehen, Daten in die Warteschlange zu stellen und Daten aus der Warteschlange abzurufen.

    • Alle Warteschlangenvorgänge müssen

      Parallelitätssicher sein.

    • Wenn die Warteschlange voll ist und Daten in die Warteschlange gestellt werden, muss der Thread angehalten werden. Wenn die Daten in der Warteschlange entnommen werden, ist noch Platz vorhanden die Warteschlange, wenn der Thread aktiviert werden muss.

    • Wenn die Warteschlange leer ist und die Daten aus der Warteschlange abgerufen werden, muss der Thread angehalten werden. Wenn ein Thread Daten zur Warteschlange hinzufügt, wird er angehalten. Der Thread muss aufgeweckt werden.

    • In der von uns implementierten Warteschlange verwenden wir Arrays zum Speichern von Daten. Daher müssen wir die Anfangsgröße des Arrays im Konstruktor angeben und die Größe des Arrays festlegen .

    Implementierungsprinzip der Blockierungswarteschlange

    Thread-Blockierung und Aufwachen

    Wir haben oben über die Blockierungswarteschlange gesprochen. 🎜🎜#Parallelitätssicherheit

    , und wir müssen auch den Thread aufwecken und blockieren, damit wir die Wiedereintrittssperre ReentrantLock auswählen können, um die Parallelitätssicherheit zu gewährleisten, aber wir müssen auch den Thread aufwecken Thread und Blockierung, sodass wir die Bedingungsvariable Condition auswählen können, um den Thread aufzuwecken und zu blockieren. In Condition verwenden wir die folgenden zwei Funktionen: # 🎜🎜##🎜 🎜#signal wird verwendet, um Threads aufzuwecken, wenn ein Thread die signal-Funktion von Condition aufruft Weckt einen Thread auf, der durch die Funktion await blockiert wurde. ReentrantLock保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition进行线程的唤醒和阻塞操作,在Condition当中我们将会使用到的,主要有以下两个函数:

    • signal用于唤醒线程,当一个线程调用Conditionsignal函数的时候就可以唤醒一个被await函数阻塞的线程。

    • await用于阻塞线程,当一个线程调用Conditionawait函数的时候这个线程就会阻塞。

    数组循环使用

    因为队列是一端进一端出,因此队列肯定有头有尾。

    So verwenden Sie die Java-Handschriftblockierungswarteschlange

    当我们往队列当中加入一些数据之后,队列的情况可能如下:

    So verwenden Sie die Java-Handschriftblockierungswarteschlange

    在上图的基础之上我们在进行四次出队操作,结果如下:

    So verwenden Sie die Java-Handschriftblockierungswarteschlange

    在上面的状态下,我们继续加入8个数据,那么布局情况如下:

    So verwenden Sie die Java-Handschriftblockierungswarteschlange

    我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

    为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。

    代码实现

    成员变量定义

    根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:

    // 用于保护临界区的锁
    private final ReentrantLock lock;
    // 用于唤醒取数据的时候被阻塞的线程
    private final Condition notEmpty;
    // 用于唤醒放数据的时候被阻塞的线程
    private final Condition notFull;
    // 用于记录从数组当中取数据的位置 也就是队列头部的位置
    private int takeIndex;
    // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
    private int putIndex;
    // 记录队列当中有多少个数据
    private int count;
    // 用于存放具体数据的数组
    private Object[] items;

    构造函数

    我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。

    @SuppressWarnings("unchecked")
    public MyArrayBlockingQueue(int size) {
      this.lock = new ReentrantLock();
      this.notEmpty = lock.newCondition();
      this.notFull = lock.newCondition();
      // 其实可以不用初始化 类会有默认初始化 默认初始化为0
      takeIndex = 0;
      putIndex = 0;
      count = 0;
      // 数组的长度肯定不能够小于0
      if (size <= 0)
        throw new RuntimeException("size can not be less than 1");
      items = (E[])new Object[size];
    }

    put函数

    这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。

    public void put(E x){
      // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程
      // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作
      // 因此这里需要上锁
      lock.lock();
     
      try {
        // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了
        // 这个时候需要将线程挂起
        while (count == items.length)
          notFull.await(); // 将调用 await的线程挂起
        // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了
        // 这个时候需要将数组入队 
        // 调用入队函数将数据入队
        enqueue(x);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 解锁
        lock.unlock();
      }
    }
     
    // 将数据入队
    private void enqueue(E x) {
      this.items[putIndex] = x;
      if (++putIndex == items.length)
        putIndex = 0;
      count++;
      notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒
    }

    offer函数

    offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false,而不是被阻塞。

    public boolean offer(E e) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
        // 如果数组满了 则直接返回false 而不是被阻塞
        if (count == items.length)
          return false;
        else {
          // 如果数组没有满则直接入队 并且返回 true
          enqueue(e);
          return true;
        }
      } finally {
        lock.unlock();
      }
    }

    add函数

    这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。

    public boolean add(E e) {
      if (offer(e))
        return true;
      else
        throw new RuntimeException("Queue full");
    }

    take函数

    这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:

    public E take() throws InterruptedException {
      // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据
      // 进行加锁操作
      lock.lock();
      try {
        // 当 count 等于0 说明队列为空
        // 需要将线程挂起等待
        while (count == 0)
          notEmpty.await();
        // 当被唤醒之后进行出队操作
        return dequeue();
      }finally {
        lock.unlock();
      }
    }
     
    private E  dequeue() {
      final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      E x = (E) items[takeIndex];
      items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了
      if (++takeIndex == items.length)
        takeIndex = 0;
      count--; // 队列当中数据少一个了
      // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程
      // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒
      notFull.signal(); // 唤醒一个被 put 函数阻塞的线程
      return x;
    }

    重写toString函数

    因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString

    await wird verwendet, um Threads zu blockieren, wenn ein Thread conditions await aufruft. Code> Dieser Thread wird beim Ausführen der Funktion blockiert. <p></p> <p></p>Array-Recycling-Verwendung<p></p>Da die Warteschlange an einem Ende eintritt und am anderen Ende wieder herauskommt, muss die Warteschlange einen Kopf und einen haben Schwanz. <p></p> <img src="https://img.php.cn/upload/article/000/887/227/168454610335981.png" alt="So verwenden Sie Java-Handschrift, um die Warteschlange zu blockieren"># 🎜🎜##🎜🎜#Nachdem wir einige Daten zur Warteschlange hinzugefügt haben, kann die Warteschlangensituation wie folgt aussehen: #🎜🎜##🎜🎜#<img src="https://img.php.cn/%20upload/article/%20000/887/227/168454610370868.png" alt="So verwenden Sie Java-Handschrift, um die Warteschlange zu blockieren">#🎜🎜##🎜🎜#Basierend auf dem obigen Bild führen wir vier Entnahmevorgänge aus der Warteschlange durch: und die Ergebnisse sind wie folgt:# 🎜🎜##🎜🎜#<img src="https://img.php.cn/upload/article/000/887/227/168454610435515.png" alt="Verwendung Java-Handschrift zum Blockieren der Warteschlange">#🎜 🎜##🎜🎜#Im obigen Zustand fügen wir weiterhin 8 Daten hinzu, dann ist das Layout wie folgt: #🎜🎜##🎜🎜#<img src="https%20://img.php.cn/upload/article%20/000/887/227/168454610416801.png" alt="So verwenden Sie Java-Handschrift, um die Warteschlange zu blockieren">#🎜🎜##🎜🎜#Das wissen wir, wann Durch das Hinzufügen von Daten im obigen Bild wird nicht nur der Platz in der zweiten Hälfte des Arrays verbraucht, sondern Sie können den ungenutzten Platz in der ersten Hälfte weiterhin nutzen, was bedeutet, dass ein zyklischer Nutzungsprozess innerhalb der Warteschlange implementiert wird. #🎜🎜##🎜🎜#Um die zyklische Verwendung des Arrays sicherzustellen, müssen wir eine Variable verwenden, um die Position des Warteschlangenkopfes im Array aufzuzeichnen, und eine Variable, um die Position des Warteschlangenendes im Array aufzuzeichnen und eine Variable zum Aufzeichnen der Positionen in der Warteschlange. Wie viele Daten. #🎜🎜##🎜🎜#Code-Implementierung#🎜🎜##🎜🎜#Definition von Mitgliedsvariablen#🎜🎜##🎜🎜#Gemäß der obigen Analyse können wir wissen, dass wir unter den Klassen, die wir selbst implementieren, Folgendes benötigen Klassen Mitgliedsvariablen: #🎜🎜#<pre class="brush:java;">@Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(&quot;[&quot;); // 这里需要上锁 因为我们在打印的时候需要打印所有的数据 // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历 // 操作的时候是不能进行插入和删除操作的 因为打印的是某 // 个时刻的数据 lock.lock(); try { if (count == 0) stringBuilder.append(&quot;]&quot;); else { int cur = 0; // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count // 个数据 while (cur != count) { // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的 stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + &quot;, &quot;); cur += 1; } // 删除掉最后一次没用的 &quot;, &quot; stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(&amp;#39;]&amp;#39;); } }finally { lock.unlock(); } return stringBuilder.toString(); }</pre>#🎜🎜#Konstruktor#🎜🎜##🎜🎜#Unser Konstruktor ist auch sehr einfach. Der Kern besteht darin, einen Parameter in Array-Größe zu übergeben und ihn zu initialisieren und ihm Werte zuzuweisen oben genannten Variablen. #🎜🎜#<pre class="brush:java;">import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyArrayBlockingQueue&lt;E&gt; { // 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items; @SuppressWarnings(&quot;unchecked&quot;) public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; if (size &lt;= 0) throw new RuntimeException(&quot;size can not be less than 1&quot;); items = (E[])new Object[size]; } public void put(E x){ lock.lock(); try { while (count == items.length) notFull.await(); enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings(&quot;unchecked&quot;) E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException(&quot;Queue full&quot;); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(&quot;[&quot;); lock.lock(); try { if (count == 0) stringBuilder.append(&quot;]&quot;); else { int cur = 0; while (cur != count) { stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(&quot;, &quot;); cur += 1; } stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(&amp;#39;]&amp;#39;); } }finally { lock.unlock(); } return stringBuilder.toString(); } }</pre>#🎜🎜#put-Funktion#🎜🎜##🎜🎜#Dies ist eine wichtigere Funktion. Wenn die Warteschlange nicht voll ist, fügen Sie die Daten einfach direkt in das Array ein voll ist, muss der Thread angehalten werden. #🎜🎜#<pre class="brush:java;">import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { MyArrayBlockingQueue&lt;Integer&gt; queue = new MyArrayBlockingQueue&lt;&gt;(5); Thread thread = new Thread(() -&gt; { for (int i = 0; i &lt; 10; i++) { System.out.println(Thread.currentThread().getName() + &quot; 往队列当中加入数据:&quot; + i); queue.put(i); } }, &quot;生产者&quot;); Thread thread1 = new Thread(() -&gt; { for (int i = 0; i &lt; 10; i++) { try { System.out.println(Thread.currentThread().getName() + &quot; 从队列当中取出数据:&quot; + queue.take()); System.out.println(Thread.currentThread().getName() + &quot; 当前队列当中的数据:&quot; + queue); } catch (InterruptedException e) { e.printStackTrace(); } } }, &quot;消费者&quot;); thread.start(); TimeUnit.SECONDS.sleep(3); thread1.start(); } }</pre>#🎜🎜#Angebotsfunktion#🎜🎜##🎜🎜#Die Angebotsfunktion ist dieselbe wie die Put-Funktion, der Unterschied besteht jedoch darin, dass die Angebotsfunktion <code> zurückgibt, wenn das Array mit Daten gefüllt ist false code> anstatt blockiert zu werden. #🎜🎜#rrreee#🎜🎜#Funktion hinzufügen#🎜🎜##🎜🎜#Diese Funktion hat den gleichen Effekt wie die beiden oben genannten Funktionen. Sie fügt der Warteschlange auch Daten hinzu, wenn die einzelne Warteschlange jedoch voll ist eine Ausnahme auslösen. #🎜🎜#rrreee#🎜🎜#take function#🎜🎜##🎜🎜#Diese Funktion entnimmt hauptsächlich Daten aus der Warteschlange, aber wenn die Warteschlange leer ist, blockiert diese Funktion den Thread, der die Funktion aufruft: #🎜 🎜#rrreee#🎜🎜#ToString-Funktion neu schreiben#🎜🎜##🎜🎜#Da wir unsere Klasse in der nachfolgenden Testfunktion drucken und beim Drucken dieser Klasse der <code>toStringdes Objekts >The genannt wird Die Methode ruft einen String ab und gibt den String schließlich aus. #🎜🎜#rrreee#🎜🎜#Vollständiger Code#🎜🎜##🎜🎜#Der gesamte Blocking-Queue-Code, den wir vervollständigt haben, lautet wie folgt: #🎜🎜#rrreee#🎜🎜#Testen Sie nun den obigen Code: #🎜🎜 ## 🎜🎜#Wir verwenden jetzt die Blockierungswarteschlange, um ein Produzenten-Konsumenten-Modell zu simulieren. Stellen Sie die Größe der Blockierungswarteschlange auf 5 ein. Der Produzenten-Thread fügt Daten zur Warteschlange hinzu. Die Daten sind 10 Zahlen von 0 bis 9. Die Konsumenten-Threads Insgesamt wird 10-mal verbraucht. #🎜🎜#rrreee#🎜🎜#Die Ausgabe des obigen Codes sieht folgendermaßen aus: #🎜🎜#

    Produzent fügt Daten zur Warteschlange hinzu: 0
    Produzent fügt Daten zur Warteschlange hinzu: 1
    Produzent fügt Daten zur Warteschlange hinzu: 2
    Produzent fügt Daten zur Warteschlange hinzu Warteschlange: 3
    Produzent fügt Daten zur Warteschlange hinzu: 4
    Produzent fügt Daten zur Warteschlange hinzu: 5
    Verbraucher nimmt Daten aus der Warteschlange: 0
    Produktion Der Benutzer fügt Daten hinzu die Warteschlange: 6
    Consumer Die Daten in der aktuellen Warteschlange: [1, 2, 3, 4, 5]
    Der Consumer entnimmt die Daten aus der Warteschlange: 1
    Consumer Daten in der aktuelle Warteschlange: [2, 3, 4, 5]
    Der Verbraucher entnimmt Daten aus der Warteschlange: 2
    Daten in der aktuellen Warteschlange des Verbrauchers: [3, 4, 5, 6]# 🎜🎜# Der Produzent fügt der Warteschlange Daten hinzu: 7
    Der Verbraucher nimmt Daten aus der Warteschlange: 3
    Aktuelle Daten des Verbrauchers in der Warteschlange: [4, 5, 6, 7]
    Der Verbraucher entnimmt Daten aus der Warteschlange: 4
    Die Daten in der aktuellen Warteschlange des Verbrauchers: [5, 6, 7]
    Der Verbraucher entnimmt die Daten aus der Warteschlange: 5
    Die aktuellen Warteschlangendaten des Verbrauchers: [ 6, 7]
    Produzent fügt Daten zur Warteschlange hinzu: 8
    Verbraucher nimmt Daten aus der Warteschlange: 6
    Aktuelle Daten des Verbrauchers in der Warteschlange: [7, 8 ]
    Verbraucher entnimmt Daten aus der Warteschlange: 7
    Aktuelle Warteschlangendaten des Verbrauchers: [8]
    Verbraucher entnimmt Daten aus der Warteschlange: 8
    Aktuelle Warteschlange des Verbrauchers Die darin enthaltenen Daten: []#🎜🎜 #Der Produzent fügt Daten zur Warteschlange hinzu: 9
    Der Verbraucher entnimmt die Daten aus der Warteschlange: 9
    Die Daten in der aktuellen Warteschlange des Verbrauchers: []

    #🎜🎜 # Aus den obigen Ausgabeergebnissen wissen wir, dass der Producer-Thread nach dem Drucken von 5 angehalten wurde, denn wenn er nicht angehalten wurde, könnte der Producer-Thread die Ausgabe definitiv auf einmal abschließen, da der Consumer-Thread 3 Sekunden lang blockiert war. Da die Blockierungswarteschlange voll ist, konnte er die Ausgabe nach dem Drucken der Nummer 5 nicht abschließen, was dazu führte, dass der Produzenten-Thread angehalten wurde. Sobald der Konsument mit dem Konsum beginnt, wird Platz in der Blockierungswarteschlange verfügbar und der Produzententhread kann mit der Produktion fortfahren.

    Das obige ist der detaillierte Inhalt vonSo verwenden Sie die Java-Handschriftblockierungswarteschlange. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen