Heim  >  Artikel  >  Java  >  Detaillierte Erläuterung der Parallelitätsklassen basierend auf AbstractQueuedSynchronizer

Detaillierte Erläuterung der Parallelitätsklassen basierend auf AbstractQueuedSynchronizer

零下一度
零下一度Original
2017-07-17 14:58:471352Durchsuche

ReentrantLock-Implementierungsprinzip im fairen Modus

Der vorherige Artikel untersuchte die exklusive Sperre und die gemeinsame Sperre von AbstractQueuedSynchronizer, die auf den ersten beiden basiert Artikel können Sie den Sieg nutzen und sehen, wie die auf AbstractQueuedSynchronizer basierende Parallelitätsklasse implementiert wird.

ReentrantLock ist offensichtlich eine exklusive Sperre Die erste ist ReentrantLock im Fair-Modus und erbt von AbstractQueuedSynchronizer Schauen Sie sich die Code-Implementierung an:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = -5179523762034025860L; 3  4     /** 5      * Performs {@link Lock#lock}. The main reason for subclassing 6      * is to allow fast path for nonfair version. 7      */ 8     abstract void lock(); 9 10     /**11      * Performs non-fair tryLock.  tryAcquire is12      * implemented in subclasses, but both need nonfair13      * try for trylock method.14      */15     final boolean nonfairTryAcquire(int acquires) {16         final Thread current = Thread.currentThread();17         int c = getState();18         if (c == 0) {19             if (compareAndSetState(0, acquires)) {20                 setExclusiveOwnerThread(current);21                 return true;22             }23         }24         else if (current == getExclusiveOwnerThread()) {25             int nextc = c + acquires;26             if (nextc < 0) // overflow27                 throw new Error("Maximum lock count exceeded");28             setState(nextc);29             return true;30         }31         return false;32     }33 34     protected final boolean tryRelease(int releases) {35         int c = getState() - releases;36         if (Thread.currentThread() != getExclusiveOwnerThread())37             throw new IllegalMonitorStateException();38         boolean free = false;39         if (c == 0) {40             free = true;41             setExclusiveOwnerThread(null);42         }43         setState(c);44         return free;45     }46 47     protected final boolean isHeldExclusively() {48         // While we must in general read state before owner,49         // we don&#39;t need to do so to check if current thread is owner50         return getExclusiveOwnerThread() == Thread.currentThread();51     }52 53     final ConditionObject newCondition() {54         return new ConditionObject();55     }56 57     // Methods relayed from outer class58 59     final Thread getOwner() {60         return getState() == 0 ? null : getExclusiveOwnerThread();61     }62 63     final int getHoldCount() {64         return isHeldExclusively() ? getState() : 0;65     }66 67     final boolean isLocked() {68         return getState() != 0;69     }70 71     /**72      * Reconstitutes this lock instance from a stream.73      * @param s the stream74      */75     private void readObject(java.io.ObjectInputStream s)76         throws java.io.IOException, ClassNotFoundException {77         s.defaultReadObject();78         setState(0); // reset to unlocked state79     }80 }

Sync gehört zu einer öffentlichen Klasse, die abstrakt ist, was darauf hinweist, dass Sync vererbt wird Organisieren Sie, was Sync hauptsächlich tut. (Da Sync nicht der Schlüssel zum fairen Sperren von ReentrantLock ist):

  1. definiert eine Sperrmethode für die Implementierung von Unterklassen Warum wir normalerweise ReentrantLock aufrufen können Die lock()-Methode liegt daran, dass Sync sie

  2. definiert, wodurch die unfaire Sperre tryAcquira-Methode

  3. implementiert wird
  4. Implementiert die tryRelease-Methode, relativ einfach, Status -1, der Thread der exklusiven Sperre ist leer

  5. Implementiert isHeldExclusively method

  6. definiert die newCondition-Methode und ermöglicht Entwicklern die Verwendung von Condition zum Implementieren von Benachrichtigungen/Warten

Als nächstes werfen Sie einen Blick auf die Implementierung von Fair Lock, FairSync-Klasse, die von Sync erbt:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = -3000897897090466540L; 3  4     final void lock() { 5         acquire(1); 6     } 7  8     /** 9      * Fair version of tryAcquire.  Don&#39;t grant access unless10      * recursive call or no waiters or is first.11      */12     protected final boolean tryAcquire(int acquires) {13         final Thread current = Thread.currentThread();14         int c = getState();15         if (c == 0) {16             if (!hasQueuedPredecessors() &&17                 compareAndSetState(0, acquires)) {18                 setExclusiveOwnerThread(current);19                 return true;20             }21         }22         else if (current == getExclusiveOwnerThread()) {23             int nextc = c + acquires;24             if (nextc < 0)25                 throw new Error("Maximum lock count exceeded");26             setState(nextc);27             return true;28         }29         return false;30     }31 }

Um die wichtigsten Punkte zu klären:

  1. Jedes Mal, wenn der aktuelle Thread „lock()“ und dann „lock()“ erhält, wird der Status auf +1 gesetzt, entsprechend, wenn entsperrt( ), Zustand-1, bis der Zustand auf 0 reduziert wird, bedeutet, dass der aktuelle Thread alle Zustände freigegeben hat und andere Threads konkurrieren können

  2. Wann state=0, fällen Sie ein Urteil über die Methode hasQueuedPredecessors. Die Implementierung von hasQueuedPredecessors lautet „h != t && ((s = h.next) == null || s. thread != Thread.currentThread());“, wobei h der Kopf und t der Schwanz ist. Da das Ergebnis im Code invertiert ist, lautet das Urteil nach der Invertierung „h = = t ||. ((s = h.next ) != null && s.thread == Thread.currentThread());“, zusammenfassend gibt es zwei Situationen, die mit !hasQueuedPredecessors beurteilt werden können ():

    1. h==t, der Fall von h==t ist oder es befinden sich keine Daten in der aktuellen FIFO-Warteschlange , Oder es wurde nur ein Kopf gebaut und mit keinem Knoten verbunden, also ist Kopf Schwanz

    2. (s = h.next) != null && s.thread == Thread.currentThread(), Der aktuelle Thread ist der Thread im ersten Knoten, der wartet  

  3. Wenn kein Thread länger wartet als der aktuelle Thread, um die Erfassung durchzuführen Operation, dann tryAcquire der Thread, der den Status von 0 auf 1 durch die CAS-Operation ändert Erfolgreich

  4. Threads, die kein erfolgreiches tryAcquire haben, werden entsprechend als FIFO-Warteschlange konstruiert in der Reihenfolge von tryAcquire, das heißt, der erste fehlgeschlagene tryAcquire wird nach der Kopfposition gereiht, der zweite fehlgeschlagene tryAcquire wird an den letzten beiden Positionen des Kopfes gereiht

  5. Wenn der Thread mit erfolgreichem tryAcquire freigegeben wird, wird der Thread mit fehlgeschlagenem tryAcquire zuerst an erster Stelle eingestuft. Versuchen Sie tryAcquire, dies ist eine „Wer zuerst kommt, mahlt zuerst“, typische faire Sperre

Implementierungsprinzip des unfairen Modus ReentrantLock

Nachdem wir den fairen Modus ReentrantLock gelesen haben, schauen wir uns an, wie der unfaire Modus ReentrantLock funktioniert umgesetzt wird. Die NonfairSync-Klasse wird ebenfalls von der Sync-Klasse geerbt und wie folgt implementiert:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = 7316153563782823691L; 3  4     /** 5      * Performs lock.  Try immediate barge, backing up to normal 6      * acquire on failure. 7      */ 8     final void lock() { 9         if (compareAndSetState(0, 1))10             setExclusiveOwnerThread(Thread.currentThread());11         else12             acquire(1);13     }14 15     protected final boolean tryAcquire(int acquires) {16         return nonfairTryAcquire(acquires);17     }18 }

结合nonfairTryAcquire方法一起讲解,nonfairTryAcquire方法的实现为:

 1 final boolean nonfairTryAcquire(int acquires) { 2     final Thread current = Thread.currentThread(); 3     int c = getState(); 4     if (c == 0) { 5         if (compareAndSetState(0, acquires)) { 6             setExclusiveOwnerThread(current); 7             return true; 8         } 9     }10     else if (current == getExclusiveOwnerThread()) {11         int nextc = c + acquires;12         if (nextc < 0) // overflow13             throw new Error("Maximum lock count exceeded");14         setState(nextc);15         return true;16     }17     return false;18 }

看到差别就在于非公平锁lock()的时候会先尝试通过CAS看看能不能把state从0变为1(即获取锁),如果可以的话,直接获取锁而不需要排队。举个实际例子就很好理解了:

  1. 线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队

  2. 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2

  3. 此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁

  4. 线程4成功将state从0变为1,线程2竞争锁失败,继续park

看到整个过程中,后来的线程4反而比先来的线程2先获取锁,相当于是一种非公平的模式,

那为什么非公平锁效率会比公平锁效率高?上面第(3)步如果线程2和线程4不竞争锁就是答案。为什么这么说,后面的解释很重要,希望大家可以理解:

<span style="color: #000000">线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。<br/><br/>那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。</span>

从这段解释我们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是因为加锁时间长,相当于线程2将state设为0并去唤醒线程2的这段时间,线程4无法完成释放锁,那么线程2被唤醒由于没法获取到锁,又被阻塞了,这种唤醒-阻塞的操作会引起线程的上下文切换,继而影响系统的性能。

Semaphore实现原理

Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1相当于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。信号量允许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一起写了。

首先还是看一下Semaphore的基础设施,它和ReentrantLock一样,也有一个Sync:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 1192457210091910933L; 3  4     Sync(int permits) { 5         setState(permits); 6     } 7  8     final int getPermits() { 9         return getState();10     }11 12     final int nonfairTryAcquireShared(int acquires) {13         for (;;) {14             int available = getState();15             int remaining = available - acquires;16             if (remaining < 0 ||17                 compareAndSetState(available, remaining))18                 return remaining;19         }20     }21 22     protected final boolean tryReleaseShared(int releases) {23         for (;;) {24             int current = getState();25             int next = current + releases;26             if (next < current) // overflow27                 throw new Error("Maximum permit count exceeded");28             if (compareAndSetState(current, next))29                 return true;30         }31     }32 33     final void reducePermits(int reductions) {34         for (;;) {35             int current = getState();36             int next = current - reductions;37             if (next > current) // underflow38                 throw new Error("Permit count underflow");39             if (compareAndSetState(current, next))40                 return;41         }42     }43 44     final int drainPermits() {45         for (;;) {46             int current = getState();47             if (current == 0 || compareAndSetState(current, 0))48                 return current;49         }50     }51 }

和ReentrantLock的Sync差不多,Semaphore的Sync定义了以下的一些主要内容:

  1. getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量

  2. 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared

  3. 定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)

再看下公平信号量的实现,同样的FairSync,继承自Sync,代码为:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = 2014338818796000944L; 3  4     FairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         for (;;) {10             if (hasQueuedPredecessors())11                 return -1;12             int available = getState();13             int remaining = available - acquires;14             if (remaining < 0 ||15                 compareAndSetState(available, remaining))16                 return remaining;17         }18     }19 }

首先第10行的hasQueuedPredecessors方法,前面已经说过了,如果已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示无法获取共享锁成功。

接着获取available,available就是state,用volatile修饰,所以线程中可以看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种情况直接返回:

  1. remaining减完<0

  2. 通过cas设置成功

之后就是和之前说过的共享锁的逻辑了,如果返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。

接着看一下非公平信号量的实现,NonfairSync继承Sync:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = -2694183684443567898L; 3  4     NonfairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         return nonfairTryAcquireShared(acquires);10     }11 }

nonfairTryAcquireShared在父类已经实现了,再贴一下代码:

1 final int nonfairTryAcquireShared(int acquires) {2     for (;;) {3         int available = getState();4         int remaining = available - acquires;5         if (remaining < 0 ||6             compareAndSetState(available, remaining))7             return remaining;8     }9 }

看到这里和公平Semaphore只有一点差别:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是不是自身都无所谓,对于非公平Semaphore都一样,反正线程调用Semaphore的acquire方法就将当前state-1,如果得到的remaining设置成功或者CAS操作成功就返回,这种操作没有遵循先到先得的原则,即非公平信号量。

至于非公平信号量对比公平信号量的优点,和ReentrantLock的非公平锁对比ReentrantLock的公平锁一样,就不说了。

 

CountDownLatch实现原理

CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。

CountDownLatch是一种共享锁,通过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:

 1 public void await() throws InterruptedException { 2     sync.acquireSharedInterruptibly(1); 3 }

acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:

 1 private static final class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 4982264981922014374L; 3  4     Sync(int count) { 5         setState(count); 6     } 7  8     int getCount() { 9         return getState();10     }11 12     protected int tryAcquireShared(int acquires) {13         return (getState() == 0) ? 1 : -1;14     }15 16     protected boolean tryReleaseShared(int releases) {17         // Decrement count; signal when transition to zero18         for (;;) {19             int c = getState();20             if (c == 0)21                 return false;22             int nextc = c-1;23             if (compareAndSetState(c, nextc))24                 return nextc == 0;25         }26     }27 }

其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:

  1. 传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程

  2. 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程

为了验证(2),看一下上面Sync的tryReleaseShared方法就可以了,确实是这么实现的。

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der Parallelitätsklassen basierend auf AbstractQueuedSynchronizer. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn