Maison >Java >javaDidacticiel >Méthode d'implémentation de Java LockSupport manuscrit
Parmi les différents outils de concurrence qui nous sont fournis dans le JDK, tels que l'implémentation interne de ReentrantLock et d'autres outils, un outil est souvent utilisé, et cet outil est LockSupport. LockSupport nous offre une fonction très puissante. C'est la primitive la plus basique pour le blocage de threads. Elle peut bloquer un thread ou réveiller un thread, elle est donc souvent utilisée dans des scénarios simultanés.
Avant de comprendre le principe d'implémentation de LockSupport, utilisons d'abord un cas pour comprendre les fonctions de LockSupport !
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; public class Demo { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { System.out.println("park 之前"); LockSupport.park(); // park 函数可以将调用这个方法的线程挂起 System.out.println("park 之后"); }); thread.start(); TimeUnit.SECONDS.sleep(5); System.out.println("主线程休息了 5s"); System.out.println("主线程 unpark thread"); LockSupport.unpark(thread); // 主线程将线程 thread 唤醒 唤醒之后线程 thread 才可以继续执行 } }
Le résultat du code ci-dessus est le suivant :
parking avant
le thread principal s'est reposé pendant 5s
le thread principal déparque le thread
parking après
À première vue, les fonctions de mise en œuvre de park et de déparking de LockSupport ci-dessus et attendent et implémentation du signal Les fonctions semblent être les mêmes, mais en fait elles ne le sont pas. Jetons un coup d'œil au code suivant :
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; public class Demo02 { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("park 之前"); LockSupport.park(); // 线程 thread 后进行 park 操作 System.out.println("park 之后"); }); thread.start(); System.out.println("主线程 unpark thread"); LockSupport.unpark(thread); // 先进行 unpark 操作 } }
La sortie du code ci-dessus est la suivante :
le thread principal dépare le thread
se gare avant.
parker après
dans le code ci-dessus, le thread principal L'opération de déparquage sera effectuée en premier, puis le thread effectuera l'opération de parcage. Dans ce cas, le programme peut également s'exécuter normalement. Mais si l'appel du signal est avant l'appel d'attente, le programme ne sera pas exécuté. Par exemple, le code suivant :
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Demo03 { private static final ReentrantLock lock = new ReentrantLock(); private static final Condition condition = lock.newCondition(); public static void thread() throws InterruptedException { lock.lock(); try { TimeUnit.SECONDS.sleep(5); condition.await(); System.out.println("等待完成"); }finally { lock.unlock(); } } public static void mainThread() { lock.lock(); try { System.out.println("发送信号"); condition.signal(); }finally { lock.unlock(); System.out.println("主线程解锁完成"); } } public static void main(String[] args) { Thread thread = new Thread(() -> { try { thread(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); mainThread(); } }
La sortie du code ci-dessus est la suivante :
Envoyer le signal
Le déverrouillage du thread principal est terminé.
Dans le code ci-dessus, "En attente d'achèvement" ne sera jamais imprimé. En effet, la fonction signal est appelée avant l'attente. La fonction signal n'aura un effet que sur la fonction wait exécutée avant elle, mais pas sur l'attente. la fonction appelée après elle a un impact.
Alors, quelle est la cause de cet effet ?
En fait, lorsque la JVM implémentera LockSupport, elle maintiendra en interne une variable compteur _counter pour chaque thread. Cette variable représente le "nombre de licences". Le thread ne peut s'exécuter que lorsqu'il y a une licence en même temps. le nombre maximum de licences ne peut être que de 1. Lors d’un appel au parc une seule fois, le nombre de licences sera réduit d’une. Lorsque unpark est appelé une fois, le compteur sera incrémenté de un, mais la valeur du compteur ne peut pas dépasser 1.
Lorsqu'un thread appelle park, il doit attendre une licence. Ce n'est qu'après avoir obtenu la licence que le thread peut continuer à s'exécuter, ou si une licence a été obtenue avant le park, il n'a pas besoin de se bloquer et peut directement l'être. exécuté.
Dans l'article précédent, nous avons présenté le principe de locksupport. Sa principale implémentation interne est réalisée grâce aux licences :
La licence que chaque thread peut obtenir Le nombre maximum. est 1.
Lorsque la méthode unpark est appelée, le thread peut obtenir une licence. La limite supérieure du nombre de licences est de 1. S'il existe déjà une licence, les licences ne peuvent pas être accumulées.
Lors de l'appel de la méthode park, si le thread appelant la méthode park n'a pas de licence, le thread doit être suspendu jusqu'à ce que d'autres threads appellent la méthode unpark et délivrent une licence à ce thread avant que le thread puisse continuer à l'implémenter. . Mais si le thread possède déjà une licence, le thread ne se bloquera pas et pourra s’exécuter directement.
Dans notre propre implémentation de Parker, nous pouvons également attribuer à chaque thread un compteur pour enregistrer le nombre de licences pour le thread lorsque le nombre de licences est supérieur ou égal à 0. , le thread peut s'exécuter, sinon le thread doit être bloqué. Les règles spécifiques du protocole sont les suivantes :
Le nombre de licences pour le thread initial est 0.
Si lorsque nous appelons park, la valeur du compteur est égale à 1 et la valeur du compteur devient 0, le thread peut continuer à s'exécuter.
Si la valeur du compteur est égale à 0 lorsque nous appelons park, le thread ne peut pas continuer à s'exécuter, le thread doit être suspendu et la valeur du compteur est définie sur -1.
Si lorsque nous appelons unpark, la valeur du compteur du thread non parqué est égale à 0, alors la valeur du compteur doit être modifiée à 1.
Si la valeur du compteur du thread non parqué est égale à 1 lorsque nous appelons unpark, il n'est pas nécessaire de modifier la valeur du compteur, car la valeur maximale du compteur est 1.
Lorsque nous appelons unpark, si la valeur du compteur est égale à -1, cela signifie que le thread a été suspendu, et le thread doit être réveillé et la valeur du compteur doit être définie sur 0.
Parce que cela implique de bloquer et de réveiller des threads, nous pouvons utiliser des verrous réentrants ReentrantLock et des variables de condition Condition, nous devons donc être familiers avec l'utilisation de ces deux outils.
ReentrantLock est principalement utilisé pour le verrouillage et le déverrouillage, et est utilisé pour protéger les zones critiques.
La méthode Condition.awat est utilisée pour bloquer le fil.
La méthode Condition.signal est utilisée pour réveiller le fil.
因为我们在unpark方法当中需要传入具体的线程,将这个线程发放许可证,同时唤醒这个线程,因为是需要针对特定的线程进行唤醒,而condition唤醒的线程是不确定的,因此我们需要为每一个线程维护一个计数器和条件变量,这样每个条件变量只与一个线程相关,唤醒的肯定就是一个特定的线程。我们可以使用HashMap进行实现,键为线程,值为计数器或者条件变量。
因此综合上面的分析我们的类变量如下:
private final ReentrantLock lock; // 用于保护临界去 private final HashMap<Thread, Integer> permits; // 许可证的数量 private final HashMap<Thread, Condition> conditions; // 用于唤醒和阻塞线程的条件变量
构造函数主要对变量进行赋值:
public Parker() { lock = new ReentrantLock(); permits = new HashMap<>(); conditions = new HashMap<>(); }
park方法
public void park() { Thread t = Thread.currentThread(); // 首先得到当前正在执行的线程 if (conditions.get(t) == null) { // 如果还没有线程对应的condition的话就进行创建 conditions.put(t, lock.newCondition()); } lock.lock(); try { // 如果许可证变量还没有创建 或者许可证等于0 说明没有许可证了 线程需要被挂起 if (permits.get(t) == null || permits.get(t) == 0) { permits.put(t, -1); // 同时许可证的数目应该设置为-1 conditions.get(t).await(); }else if (permits.get(t) > 0) { permits.put(t, 0); // 如果许可证的数目大于0 也就是为1 说明线程已经有了许可证因此可以直接被放行 但是需要消耗一个许可证 } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
unpark方法
public void unpark(Thread thread) { Thread t = thread; // 给线程 thread 发放一个许可证 lock.lock(); try { if (permits.get(t) == null) // 如果还没有创建许可证变量 说明线程当前的许可证数量等于初始数量也就是0 因此方法许可证之后 许可证的数量为 1 permits.put(t, 1); else if (permits.get(t) == -1) { // 如果许可证数量为-1,则说明肯定线程 thread 调用了park方法,而且线程 thread已经被挂起了 因此在 unpark 函数当中不急需要将许可证数量这是为0 同时还需要将线程唤醒 permits.put(t, 0); conditions.get(t).signal(); }else if (permits.get(t) == 0) { // 如果许可证数量为0 说明线程正在执行 因此许可证数量加一 permits.put(t, 1); } // 除此之外就是许可证为1的情况了 在这种情况下是不需要进行操作的 因为许可证最大的数量就是1 }finally { lock.unlock(); } }
import java.util.HashMap; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Parker { private final ReentrantLock lock; private final HashMap<Thread, Integer> permits; private final HashMap<Thread, Condition> conditions; public Parker() { lock = new ReentrantLock(); permits = new HashMap<>(); conditions = new HashMap<>(); } public void park() { Thread t = Thread.currentThread(); if (conditions.get(t) == null) { conditions.put(t, lock.newCondition()); } lock.lock(); try { if (permits.get(t) == null || permits.get(t) == 0) { permits.put(t, -1); conditions.get(t).await(); }else if (permits.get(t) > 0) { permits.put(t, 0); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void unpark(Thread thread) { Thread t = thread; lock.lock(); try { if (permits.get(t) == null) permits.put(t, 1); else if (permits.get(t) == -1) { permits.put(t, 0); conditions.get(t).signal(); }else if (permits.get(t) == 0) { permits.put(t, 1); } }finally { lock.unlock(); } } }
其实在JVM底层对于park和unpark的实现也是基于锁和条件变量的,只不过是用更加底层的操作系统和libc(linux操作系统)提供的API进行实现的。虽然API不一样,但是原理是相仿的,思想也相似。
比如下面的就是JVM实现的unpark方法:
void Parker::unpark() { int s, status; // 进行加锁操作 相当于 可重入锁的 lock.lock() status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant"); s = _counter; _counter = 1; if (s < 1) { // 如果许可证小于 1 进行下面的操作 if (WorkAroundNPTLTimedWaitHang) { // 这行代码相当于 condition.signal() 唤醒线程 status = pthread_cond_signal (_cond); assert (status == 0, "invariant"); // 解锁操作 相当于可重入锁的 lock.unlock() status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (_cond); assert (status == 0, "invariant"); } } else { // 如果有许可证 也就是 s == 1 那么不许要将线程挂起 // 解锁操作 相当于可重入锁的 lock.unlock() pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } }
JVM实现的park方法,如果没有许可证也是会将线程挂起的:
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!