Maison >Java >javaDidacticiel >Comment utiliser la file d'attente de blocage d'écriture manuscrite Java
Les principales exigences pour bloquer les files d'attente sont les suivantes :
concurrency safe.
, et nous devons également réveiller et bloquer le thread, afin que nous puissions choisir le verrou réentrant ReentrantLock
pour assurer la sécurité de la concurrence, mais nous devons également réveiller le thread et blocage, nous pouvons donc choisir la variable de condition Condition
pour réveiller et bloquer le thread Dans Condition
nous utiliserons les deux fonctions suivantes : # 🎜🎜##🎜. 🎜#signal
est utilisé pour réveiller les threads Lorsqu'un thread appelle la fonction signal
de Condition
Vous pouvez. réveiller un thread bloqué par la fonction await
. ReentrantLock
保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition
进行线程的唤醒和阻塞操作,在Condition
当中我们将会使用到的,主要有以下两个函数:
signal
用于唤醒线程,当一个线程调用Condition
的signal
函数的时候就可以唤醒一个被await
函数阻塞的线程。
await
用于阻塞线程,当一个线程调用Condition
的await
函数的时候这个线程就会阻塞。
因为队列是一端进一端出,因此队列肯定有头有尾。
当我们往队列当中加入一些数据之后,队列的情况可能如下:
在上图的基础之上我们在进行四次出队操作,结果如下:
在上面的状态下,我们继续加入8个数据,那么布局情况如下:
我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。
为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。
根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:
// 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items;
我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。
@SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; // 数组的长度肯定不能够小于0 if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; }
这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。
public void put(E x){ // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程 // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作 // 因此这里需要上锁 lock.lock(); try { // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了 // 这个时候需要将线程挂起 while (count == items.length) notFull.await(); // 将调用 await的线程挂起 // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了 // 这个时候需要将数组入队 // 调用入队函数将数据入队 enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 解锁 lock.unlock(); } } // 将数据入队 private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒 }
offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false
,而不是被阻塞。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果数组满了 则直接返回false 而不是被阻塞 if (count == items.length) return false; else { // 如果数组没有满则直接入队 并且返回 true enqueue(e); return true; } } finally { lock.unlock(); } }
这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。
public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); }
这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:
public E take() throws InterruptedException { // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据 // 进行加锁操作 lock.lock(); try { // 当 count 等于0 说明队列为空 // 需要将线程挂起等待 while (count == 0) notEmpty.await(); // 当被唤醒之后进行出队操作 return dequeue(); }finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了 if (++takeIndex == items.length) takeIndex = 0; count--; // 队列当中数据少一个了 // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程 // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒 notFull.signal(); // 唤醒一个被 put 函数阻塞的线程 return x; }
因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString
await
est utilisé pour bloquer les threads lorsqu'un thread appelle le await
de Condition
. code> Ce thread se bloquera lors de l'exécution de la fonction.
Utilisation du recyclage du tableauParce que la file d'attente entre à une extrémité et ressort à l'autre extrémité, la file d'attente doit avoir une tête et une queue.
# 🎜🎜##🎜🎜#Après avoir ajouté quelques données à la file d'attente, la situation de la file d'attente peut être la suivante : #🎜🎜##🎜🎜##🎜🎜##🎜🎜#Sur la base de l'image ci-dessus, nous effectuons quatre opérations de retrait de la file d'attente, et les résultats sont les suivants :# 🎜🎜##🎜🎜##🎜 🎜##🎜🎜#Dans l'état ci-dessus, nous continuons à ajouter 8 données, puis la disposition est la suivante : #🎜🎜##🎜🎜##🎜🎜##🎜🎜#Nous savons que lorsque en ajoutant des données dans l'image ci-dessus, non seulement l'espace dans la seconde moitié du tableau est utilisé, mais vous pouvez continuer à utiliser l'espace inutilisé dans la première moitié, ce qui signifie qu'un processus d'utilisation cyclique est implémenté dans la file d'attente. #🎜🎜##🎜🎜#Afin d'assurer l'utilisation cyclique du tableau, nous devons utiliser une variable pour enregistrer la position de la tête de file d'attente dans le tableau, une variable pour enregistrer la position de la queue de file d'attente dans le tableau , et une variable pour enregistrer la position de la file d'attente. Combien de données. #🎜🎜##🎜🎜#Implémentation du code#🎜🎜##🎜🎜#Définition de la variable membre#🎜🎜##🎜🎜#D'après l'analyse ci-dessus, nous pouvons savoir que parmi les classes que nous implémentons nous-mêmes, nous avons besoin des éléments suivants classes Variables membres : #🎜🎜#@Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); // 这里需要上锁 因为我们在打印的时候需要打印所有的数据 // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历 // 操作的时候是不能进行插入和删除操作的 因为打印的是某 // 个时刻的数据 lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count // 个数据 while (cur != count) { // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的 stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", "); cur += 1; } // 删除掉最后一次没用的 ", " stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); }#🎜🎜#Constructor#🎜🎜##🎜🎜#Notre constructeur est également très simple. L'essentiel est de transmettre un paramètre de la taille d'un tableau et d'initialiser et d'attribuer des valeurs au. ci-dessus les variables. #🎜🎜#
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyArrayBlockingQueue<E> { // 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items; @SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; } public void put(E x){ lock.lock(); try { while (count == items.length) notFull.await(); enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; while (cur != count) { stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", "); cur += 1; } stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); } }#🎜🎜#put function#🎜🎜##🎜🎜#C'est une fonction plus importante dans cette fonction, si la file d'attente n'est pas pleine, placez simplement les données directement dans le tableau. est plein, le fil doit être suspendu. #🎜🎜#
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5); Thread thread = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i); queue.put(i); } }, "生产者"); Thread thread1 = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take()); System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue); } catch (InterruptedException e) { e.printStackTrace(); } } }, "消费者"); thread.start(); TimeUnit.SECONDS.sleep(3); thread1.start(); } }#🎜🎜#offer function#🎜🎜##🎜🎜#La fonction offer est la même que la fonction put, mais la différence est que lorsque le tableau est rempli de données, la fonction offer renvoie
false code> au lieu d'être bloqué. #🎜🎜#rrreee#🎜🎜#add function#🎜🎜##🎜🎜#Cette fonction a le même effet que les deux fonctions ci-dessus. Elle ajoute également des données à la file d'attente, mais lorsque la file d'attente unique est pleine, cette fonction le fera. lancer une exception. #🎜🎜#rrreee#🎜🎜#take function#🎜🎜##🎜🎜#Cette fonction retire principalement une donnée de la file d'attente, mais lorsque la file d'attente est vide, cette fonction bloquera le thread qui appelle la fonction : #🎜 🎜#rrreee#🎜🎜#Rewrite toString function#🎜🎜##🎜🎜#Parce que nous imprimerons notre classe dans la fonction de test suivante, et lors de l'impression de cette classe, le <code>toString
de l'objet sera appelé >Le La méthode obtient une chaîne et imprime enfin la chaîne. #🎜🎜#rrreee#🎜🎜#Code complet#🎜🎜##🎜🎜#Le code complet de la file d'attente de blocage que nous avons complété est le suivant : #🎜🎜#rrreee#🎜🎜#Maintenant, testez le code ci-dessus : #🎜🎜 ## 🎜🎜#Nous utilisons maintenant la file d'attente de blocage pour simuler un modèle producteur-consommateur. Définissez la taille de la file d'attente de blocage sur 5. Le thread producteur ajoutera des données à la file d'attente. Les données sont composées de 10 nombres de 0 à 9. Les threads consommateurs. avoir un total de consommera 10 fois. #🎜🎜#rrreee#🎜🎜#Le résultat du code ci-dessus ressemble à ceci : #🎜🎜#Le producteur ajoute des données à la file d'attente : 0
Le producteur ajoute des données à la file d'attente : 1
Le producteur ajoute des données à la file d'attente : 2
Le producteur va Ajouter des données à la file d'attente : 3
Le producteur ajoute des données à la file d'attente : 4
Le producteur ajoute des données à la file d'attente : 5
Le consommateur retire les données de la file d'attente : 0
Production L'utilisateur ajoute des données à la file d'attente : 6
Consumer Les données dans la file d'attente actuelle : [1, 2, 3, 4, 5]
Le consommateur sort les données de la file d'attente : 1
Données du consommateur dans la file d'attente actuelle : [2, 3, 4, 5]
Le consommateur retire les données de la file d'attente : 2
Données dans la file d'attente actuelle du consommateur : [3, 4, 5, 6]# 🎜🎜# Le producteur ajoute des données à la file d'attente : 7
Le consommateur retire les données de la file d'attente : 3
Données actuelles du consommateur dans la file d'attente : [4, 5, 6, 7]
Le consommateur retire les données de la file d'attente : 4
Les données dans la file d'attente actuelle du consommateur : [5, 6, 7]
Le consommateur sort les données de la file d'attente : 5
La file d'attente actuelle du consommateur Données : [ 6, 7]
Le producteur ajoute des données à la file d'attente : 8
Le consommateur retire les données de la file d'attente : 6
Données actuelles du consommateur dans la file d'attente : [7, 8 ]
Consommateur supprime les données de la file d'attente : 7
Données de la file d'attente actuelle du consommateur : [8]
Le consommateur retire les données de la file d'attente : 8
File d'attente actuelle du consommateur Les données qu'il contient : []#🎜🎜 #Le producteur ajoute des données à la file d'attente : 9
Le consommateur retire les données de la file d'attente : 9
Les données dans la file d'attente actuelle du consommateur : []
#🎜🎜 # D'après les résultats de sortie ci-dessus, nous savons que le thread producteur a été suspendu après l'impression 5, car s'il n'était pas suspendu, le thread producteur pourrait certainement terminer la sortie en une seule fois, car le thread consommateur a été bloqué 3 secondes. La file d'attente de blocage étant pleine, il n'a pas terminé la sortie après l'impression du numéro 5, ce qui a entraîné la suspension du thread producteur. Une fois que le consommateur commence à consommer, de l'espace est libéré dans la file d'attente de blocage et le thread producteur peut continuer à produire.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!