Maison  >  Article  >  Java  >  Explication détaillée des classes de concurrence basées sur AbstractQueuedSynchronizer

Explication détaillée des classes de concurrence basées sur AbstractQueuedSynchronizer

零下一度
零下一度original
2017-07-17 14:58:471392parcourir

Principe d'implémentation de ReentrantLock en mode équitable

L'article précédent a étudié le verrouillage exclusif et le verrouillage partagé de AbstractQueuedSynchronizer, qui a la base des deux premiers articles. , vous pouvez profiter de la victoire et voir comment la classe de concurrence basée sur AbstractQueuedSynchronizer est implémentée.

ReentrantLock est évidemment un verrou exclusif Le premier est ReentrantLock en mode équitable. Sync est la classe de base de ReentractLock et hérite de AbstractQueuedSynchronizer. , jetez un œil à l'implémentation du code :

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = -5179523762034025860L; 3  4     /** 5      * Performs {@link Lock#lock}. The main reason for subclassing 6      * is to allow fast path for nonfair version. 7      */ 8     abstract void lock(); 9 10     /**11      * Performs non-fair tryLock.  tryAcquire is12      * implemented in subclasses, but both need nonfair13      * try for trylock method.14      */15     final boolean nonfairTryAcquire(int acquires) {16         final Thread current = Thread.currentThread();17         int c = getState();18         if (c == 0) {19             if (compareAndSetState(0, acquires)) {20                 setExclusiveOwnerThread(current);21                 return true;22             }23         }24         else if (current == getExclusiveOwnerThread()) {25             int nextc = c + acquires;26             if (nextc < 0) // overflow27                 throw new Error("Maximum lock count exceeded");28             setState(nextc);29             return true;30         }31         return false;32     }33 34     protected final boolean tryRelease(int releases) {35         int c = getState() - releases;36         if (Thread.currentThread() != getExclusiveOwnerThread())37             throw new IllegalMonitorStateException();38         boolean free = false;39         if (c == 0) {40             free = true;41             setExclusiveOwnerThread(null);42         }43         setState(c);44         return free;45     }46 47     protected final boolean isHeldExclusively() {48         // While we must in general read state before owner,49         // we don&#39;t need to do so to check if current thread is owner50         return getExclusiveOwnerThread() == Thread.currentThread();51     }52 53     final ConditionObject newCondition() {54         return new ConditionObject();55     }56 57     // Methods relayed from outer class58 59     final Thread getOwner() {60         return getState() == 0 ? null : getExclusiveOwnerThread();61     }62 63     final int getHoldCount() {64         return isHeldExclusively() ? getState() : 0;65     }66 67     final boolean isLocked() {68         return getState() != 0;69     }70 71     /**72      * Reconstitutes this lock instance from a stream.73      * @param s the stream74      */75     private void readObject(java.io.ObjectInputStream s)76         throws java.io.IOException, ClassNotFoundException {77         s.defaultReadObject();78         setState(0); // reset to unlocked state79     }80 }

Sync appartient à une classe publique, qui est abstraite, indiquant que Sync sera hérité brièvement. organiser ce que Sync fait principalement. Quoi (car Sync n'est pas la clé du verrouillage équitable de ReentrantLock) :

  1. définit une méthode de verrouillage à implémenter pour les sous-classes. pourquoi nous pouvons généralement appeler ReentrantLock La méthode lock() est parce que Sync la définit

  2. qui implémente la méthode tryAcquira de verrouillage injuste

  3. Implémente la méthode tryRelease, relativement simple, statut -1, le thread du verrou exclusif est vide

  4. Implémente la méthode isHeldExclusivement method

  5. définit la méthode newCondition, permettant aux développeurs d'utiliser Condition pour implémenter la notification/l'attente

Ensuite, jetez un œil à l'implémentation de fair lock, classe FairSync, qui hérite de Sync :

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = -3000897897090466540L; 3  4     final void lock() { 5         acquire(1); 6     } 7  8     /** 9      * Fair version of tryAcquire.  Don&#39;t grant access unless10      * recursive call or no waiters or is first.11      */12     protected final boolean tryAcquire(int acquires) {13         final Thread current = Thread.currentThread();14         int c = getState();15         if (c == 0) {16             if (!hasQueuedPredecessors() &&17                 compareAndSetState(0, acquires)) {18                 setExclusiveOwnerThread(current);19                 return true;20             }21         }22         else if (current == getExclusiveOwnerThread()) {23             int nextc = c + acquires;24             if (nextc < 0)25                 throw new Error("Maximum lock count exceeded");26             setState(nextc);27             return true;28         }29         return false;30     }31 }

Pour trier les points clés :

  1. À chaque acquisition, état +1, si le thread actuel lock() puis lock(), l'état continuera à +1, en conséquence, lors du déverrouillage ( ), state-1, jusqu'à ce que l'état soit réduit à 0 signifie que le thread actuel a libéré tous les états et que les autres threads peuvent entrer en compétition

  2. Quand state=0, portez un jugement via la méthode hasQueuedPredecessors, l'implémentation de hasQueuedPredecessors est "h != t && ((s = h.next) == null || s. thread != Thread.currentThread());", où h est la tête et t est la queue. Puisque le résultat est inversé dans le code, le jugement après inversion est "h = = t || ((s = h.next ) != null && s.thread == Thread.currentThread());", en résumé, il y a deux situations qui peuvent être jugées par !hasQueuedPredecessors ():

    1. h==t, le cas de h==t est ou il n'y a aucune donnée dans la file d'attente FIFO actuelle , Ou une seule tête a été construite et n'a été connectée à aucun nœud, donc la tête est la queue

    2. (s = h.next) != null && s.thread == Thread.currentThread(), Le fil de discussion actuel est le fil du premier Nœud en attente  

  3. Si aucun thread n'attend plus longtemps que le thread actuel pour effectuer l'acquisition opération, puis tryAcquire le thread qui change l'état de 0 à 1 via l'opération CAS Successful

  4. Les threads qui n'ont pas de tryAcquire réussi sont construits comme une file d'attente FIFO selon à l'ordre de tryAcquire, c'est-à-dire que le premier qui échoue à tryAcquire est classé après la position de tête, le deuxième tryAcquire qui échoue est classé dans les deux dernières positions de la tête

  5. Lorsque le fil de discussion avec tryAcquire réussi est publié, le fil de discussion avec tryAcquire échoué est classé premier. Essayez tryAcquire, il s'agit du premier arrivé, premier servi, verrouillage équitable typique

Principe de mise en œuvre du mode injuste ReentrantLock

Après avoir lu le mode équitable ReentrantLock, regardons comment le mode injuste ReentrantLock est mise en œuvre. La classe NonfairSync est également héritée de la classe Sync et est implémentée comme :

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = 7316153563782823691L; 3  4     /** 5      * Performs lock.  Try immediate barge, backing up to normal 6      * acquire on failure. 7      */ 8     final void lock() { 9         if (compareAndSetState(0, 1))10             setExclusiveOwnerThread(Thread.currentThread());11         else12             acquire(1);13     }14 15     protected final boolean tryAcquire(int acquires) {16         return nonfairTryAcquire(acquires);17     }18 }

结合nonfairTryAcquire方法一起讲解,nonfairTryAcquire方法的实现为:

 1 final boolean nonfairTryAcquire(int acquires) { 2     final Thread current = Thread.currentThread(); 3     int c = getState(); 4     if (c == 0) { 5         if (compareAndSetState(0, acquires)) { 6             setExclusiveOwnerThread(current); 7             return true; 8         } 9     }10     else if (current == getExclusiveOwnerThread()) {11         int nextc = c + acquires;12         if (nextc < 0) // overflow13             throw new Error("Maximum lock count exceeded");14         setState(nextc);15         return true;16     }17     return false;18 }

看到差别就在于非公平锁lock()的时候会先尝试通过CAS看看能不能把state从0变为1(即获取锁),如果可以的话,直接获取锁而不需要排队。举个实际例子就很好理解了:

  1. 线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队

  2. 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2

  3. 此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁

  4. 线程4成功将state从0变为1,线程2竞争锁失败,继续park

看到整个过程中,后来的线程4反而比先来的线程2先获取锁,相当于是一种非公平的模式,

那为什么非公平锁效率会比公平锁效率高?上面第(3)步如果线程2和线程4不竞争锁就是答案。为什么这么说,后面的解释很重要,希望大家可以理解:

<span style="color: #000000">线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。<br/><br/>那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。</span>

从这段解释我们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是因为加锁时间长,相当于线程2将state设为0并去唤醒线程2的这段时间,线程4无法完成释放锁,那么线程2被唤醒由于没法获取到锁,又被阻塞了,这种唤醒-阻塞的操作会引起线程的上下文切换,继而影响系统的性能。

Semaphore实现原理

Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1相当于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。信号量允许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一起写了。

首先还是看一下Semaphore的基础设施,它和ReentrantLock一样,也有一个Sync:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 1192457210091910933L; 3  4     Sync(int permits) { 5         setState(permits); 6     } 7  8     final int getPermits() { 9         return getState();10     }11 12     final int nonfairTryAcquireShared(int acquires) {13         for (;;) {14             int available = getState();15             int remaining = available - acquires;16             if (remaining < 0 ||17                 compareAndSetState(available, remaining))18                 return remaining;19         }20     }21 22     protected final boolean tryReleaseShared(int releases) {23         for (;;) {24             int current = getState();25             int next = current + releases;26             if (next < current) // overflow27                 throw new Error("Maximum permit count exceeded");28             if (compareAndSetState(current, next))29                 return true;30         }31     }32 33     final void reducePermits(int reductions) {34         for (;;) {35             int current = getState();36             int next = current - reductions;37             if (next > current) // underflow38                 throw new Error("Permit count underflow");39             if (compareAndSetState(current, next))40                 return;41         }42     }43 44     final int drainPermits() {45         for (;;) {46             int current = getState();47             if (current == 0 || compareAndSetState(current, 0))48                 return current;49         }50     }51 }

和ReentrantLock的Sync差不多,Semaphore的Sync定义了以下的一些主要内容:

  1. getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量

  2. 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared

  3. 定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)

再看下公平信号量的实现,同样的FairSync,继承自Sync,代码为:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = 2014338818796000944L; 3  4     FairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         for (;;) {10             if (hasQueuedPredecessors())11                 return -1;12             int available = getState();13             int remaining = available - acquires;14             if (remaining < 0 ||15                 compareAndSetState(available, remaining))16                 return remaining;17         }18     }19 }

首先第10行的hasQueuedPredecessors方法,前面已经说过了,如果已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示无法获取共享锁成功。

接着获取available,available就是state,用volatile修饰,所以线程中可以看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种情况直接返回:

  1. remaining减完<0

  2. 通过cas设置成功

之后就是和之前说过的共享锁的逻辑了,如果返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。

接着看一下非公平信号量的实现,NonfairSync继承Sync:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = -2694183684443567898L; 3  4     NonfairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         return nonfairTryAcquireShared(acquires);10     }11 }

nonfairTryAcquireShared在父类已经实现了,再贴一下代码:

1 final int nonfairTryAcquireShared(int acquires) {2     for (;;) {3         int available = getState();4         int remaining = available - acquires;5         if (remaining < 0 ||6             compareAndSetState(available, remaining))7             return remaining;8     }9 }

看到这里和公平Semaphore只有一点差别:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是不是自身都无所谓,对于非公平Semaphore都一样,反正线程调用Semaphore的acquire方法就将当前state-1,如果得到的remaining设置成功或者CAS操作成功就返回,这种操作没有遵循先到先得的原则,即非公平信号量。

至于非公平信号量对比公平信号量的优点,和ReentrantLock的非公平锁对比ReentrantLock的公平锁一样,就不说了。

 

CountDownLatch实现原理

CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。

CountDownLatch是一种共享锁,通过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:

 1 public void await() throws InterruptedException { 2     sync.acquireSharedInterruptibly(1); 3 }

acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:

 1 private static final class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 4982264981922014374L; 3  4     Sync(int count) { 5         setState(count); 6     } 7  8     int getCount() { 9         return getState();10     }11 12     protected int tryAcquireShared(int acquires) {13         return (getState() == 0) ? 1 : -1;14     }15 16     protected boolean tryReleaseShared(int releases) {17         // Decrement count; signal when transition to zero18         for (;;) {19             int c = getState();20             if (c == 0)21                 return false;22             int nextc = c-1;23             if (compareAndSetState(c, nextc))24                 return nextc == 0;25         }26     }27 }

其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:

  1. 传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程

  2. 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程

为了验证(2),看一下上面Sync的tryReleaseShared方法就可以了,确实是这么实现的。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn