ホームページ  >  記事  >  Java  >  AbstractQueuedSynchronizer に基づく同時実行クラスの詳細な説明

AbstractQueuedSynchronizer に基づく同時実行クラスの詳細な説明

零下一度
零下一度オリジナル
2017-07-17 14:58:471404ブラウズ

フェアモード ReentrantLock の実装原理

前の記事では、AbstractQueuedSynchronizer の排他的ロックと共有ロックについて学習しました。最初の 2 つの記事の基礎を利用して、同時実行性がどのように変化するかを見ていきます。 AbstractQueuedSynchronizer に基づくクラスが実装されています。

ReentrantLockは明らかに排他ロックです、まず第一に、フェアモードのReentrantLock、SyncはAbstractQueuedSynchronizerから継承されたReentractLockの基本クラスです、コード実装を見てください:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = -5179523762034025860L; 3  4     /** 5      * Performs {@link Lock#lock}. The main reason for subclassing 6      * is to allow fast path for nonfair version. 7      */ 8     abstract void lock(); 9 10     /**11      * Performs non-fair tryLock.  tryAcquire is12      * implemented in subclasses, but both need nonfair13      * try for trylock method.14      */15     final boolean nonfairTryAcquire(int acquires) {16         final Thread current = Thread.currentThread();17         int c = getState();18         if (c == 0) {19             if (compareAndSetState(0, acquires)) {20                 setExclusiveOwnerThread(current);21                 return true;22             }23         }24         else if (current == getExclusiveOwnerThread()) {25             int nextc = c + acquires;26             if (nextc < 0) // overflow27                 throw new Error("Maximum lock count exceeded");28             setState(nextc);29             return true;30         }31         return false;32     }33 34     protected final boolean tryRelease(int releases) {35         int c = getState() - releases;36         if (Thread.currentThread() != getExclusiveOwnerThread())37             throw new IllegalMonitorStateException();38         boolean free = false;39         if (c == 0) {40             free = true;41             setExclusiveOwnerThread(null);42         }43         setState(c);44         return free;45     }46 47     protected final boolean isHeldExclusively() {48         // While we must in general read state before owner,49         // we don&#39;t need to do so to check if current thread is owner50         return getExclusiveOwnerThread() == Thread.currentThread();51     }52 53     final ConditionObject newCondition() {54         return new ConditionObject();55     }56 57     // Methods relayed from outer class58 59     final Thread getOwner() {60         return getState() == 0 ? null : getExclusiveOwnerThread();61     }62 63     final int getHoldCount() {64         return isHeldExclusively() ? getState() : 0;65     }66 67     final boolean isLocked() {68         return getState() != 0;69     }70 71     /**72      * Reconstitutes this lock instance from a stream.73      * @param s the stream74      */75     private void readObject(java.io.ObjectInputStream s)76         throws java.io.IOException, ClassNotFoundException {77         s.defaultReadObject();78         setState(0); // reset to unlocked state79     }80 }

Sync は Public クラスに属しており、Sync が継承されるという抽象的な説明です (Sync は ReentrantLock の公平なロックの鍵ではないため)。

  1. はロック メソッドを定義します。通常、ReentrantLock の lock() メソッドを呼び出すことができる理由は、Sync がそれを定義しているからです

  2. は不公平なロックの tryAcquira メソッドを実装します

  3. は tryRelease メソッドを実装します。シンプル、ステータス -1 、排他ロックのスレッドは空です

  4. isHeldExclusively メソッドを実装します

  5. newCondition メソッドを定義し、開発者が Condition を使用して通知/待機を実装できるようにします

次に、フェアロックを見てみましょう。 Sync を継承する FairSync クラスの実装:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = -3000897897090466540L; 3  4     final void lock() { 5         acquire(1); 6     } 7  8     /** 9      * Fair version of tryAcquire.  Don&#39;t grant access unless10      * recursive call or no waiters or is first.11      */12     protected final boolean tryAcquire(int acquires) {13         final Thread current = Thread.currentThread();14         int c = getState();15         if (c == 0) {16             if (!hasQueuedPredecessors() &&17                 compareAndSetState(0, acquires)) {18                 setExclusiveOwnerThread(current);19                 return true;20             }21         }22         else if (current == getExclusiveOwnerThread()) {23             int nextc = c + acquires;24             if (nextc < 0)25                 throw new Error("Maximum lock count exceeded");26             setState(nextc);27             return true;28         }29         return false;30     }31 }

重要なポイントを整理しましょう:

  1. 取得するたびに、state+1、現在のスレッド lock() と lock() の場合、状態は +1 を維持し、unlock() のときは対応する state-1 が、状態が 0 になるまで維持されます。これは、現在のスレッドがすべての状態を解放したことを意味します。と他のスレッドが競合する可能性があります

  2. state=0 このとき、hasQueuedPredecessors メソッドによって判断されます。 hasQueuedPredecessors の実装は "h != t && ((s = h.next) ) == null || s.thread != Thread.currentThread()); "、h は先頭、t は末尾です。コード内で結果が反転されているため、反転後の判定は "h = = t || ((s = h.next) != null && s.thread == Thread.currentThread());"、要約すると、!hasQueuedPredecessors() によって判断できる状況は 2 つあります。

    1. h==t、h==t の状況は または現在 FIFO キューにデータがありません、または 1 つのヘッドのみが構築されており、接続されていません任意のノード、つまり先頭が末尾です

    2. (s = h.next)! = null && s.thread == Thread.currentThread(), 現在のスレッドは最初のスレッドです待機中のノード

  3. 現在のスレッドよりも長く待機しているスレッドがない場合は、取得操作を実行し、CAS操作を通じてtryAcquireが0から1に成功したスレッド

  4. tryAcquire が成功しなかったスレッドは、tryAcquire の順序で FIFO キューとして構築されます。つまり、最初の tryAcquire が失敗した場合は先頭から 1 つ後にランク付けされ、2 番目の tryAcquire が失敗した場合は先頭から 2 つ後にランク付けされます

  5. tryAcquireが成功したスレッドが解放されると、tryAcquireが失敗した最初のスレッドが最初にtryAcquireを試行します。これは先着順であり、典型的な公平なロックです

不公平モードReentrantLockの実装原理

公正モード ReentrantLock を読んだ後、不公平モード ReentrantLock がどのように実装されるかを見てみましょう。 NonfairSync クラスも Sync クラスから継承され、次のように実装されます:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = 7316153563782823691L; 3  4     /** 5      * Performs lock.  Try immediate barge, backing up to normal 6      * acquire on failure. 7      */ 8     final void lock() { 9         if (compareAndSetState(0, 1))10             setExclusiveOwnerThread(Thread.currentThread());11         else12             acquire(1);13     }14 15     protected final boolean tryAcquire(int acquires) {16         return nonfairTryAcquire(acquires);17     }18 }

结合nonfairTryAcquire方法一起讲解,nonfairTryAcquire方法的实现为:

 1 final boolean nonfairTryAcquire(int acquires) { 2     final Thread current = Thread.currentThread(); 3     int c = getState(); 4     if (c == 0) { 5         if (compareAndSetState(0, acquires)) { 6             setExclusiveOwnerThread(current); 7             return true; 8         } 9     }10     else if (current == getExclusiveOwnerThread()) {11         int nextc = c + acquires;12         if (nextc < 0) // overflow13             throw new Error("Maximum lock count exceeded");14         setState(nextc);15         return true;16     }17     return false;18 }

看到差别就在于非公平锁lock()的时候会先尝试通过CAS看看能不能把state从0变为1(即获取锁),如果可以的话,直接获取锁而不需要排队。举个实际例子就很好理解了:

  1. 线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队

  2. 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2

  3. 此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁

  4. 线程4成功将state从0变为1,线程2竞争锁失败,继续park

看到整个过程中,后来的线程4反而比先来的线程2先获取锁,相当于是一种非公平的模式,

那为什么非公平锁效率会比公平锁效率高?上面第(3)步如果线程2和线程4不竞争锁就是答案。为什么这么说,后面的解释很重要,希望大家可以理解:

<span style="color: #000000">线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。<br/><br/>那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。</span>

从这段解释我们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是因为加锁时间长,相当于线程2将state设为0并去唤醒线程2的这段时间,线程4无法完成释放锁,那么线程2被唤醒由于没法获取到锁,又被阻塞了,这种唤醒-阻塞的操作会引起线程的上下文切换,继而影响系统的性能。

Semaphore实现原理

Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1相当于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。信号量允许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一起写了。

首先还是看一下Semaphore的基础设施,它和ReentrantLock一样,也有一个Sync:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 1192457210091910933L; 3  4     Sync(int permits) { 5         setState(permits); 6     } 7  8     final int getPermits() { 9         return getState();10     }11 12     final int nonfairTryAcquireShared(int acquires) {13         for (;;) {14             int available = getState();15             int remaining = available - acquires;16             if (remaining < 0 ||17                 compareAndSetState(available, remaining))18                 return remaining;19         }20     }21 22     protected final boolean tryReleaseShared(int releases) {23         for (;;) {24             int current = getState();25             int next = current + releases;26             if (next < current) // overflow27                 throw new Error("Maximum permit count exceeded");28             if (compareAndSetState(current, next))29                 return true;30         }31     }32 33     final void reducePermits(int reductions) {34         for (;;) {35             int current = getState();36             int next = current - reductions;37             if (next > current) // underflow38                 throw new Error("Permit count underflow");39             if (compareAndSetState(current, next))40                 return;41         }42     }43 44     final int drainPermits() {45         for (;;) {46             int current = getState();47             if (current == 0 || compareAndSetState(current, 0))48                 return current;49         }50     }51 }

和ReentrantLock的Sync差不多,Semaphore的Sync定义了以下的一些主要内容:

  1. getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量

  2. 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared

  3. 定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)

再看下公平信号量的实现,同样的FairSync,继承自Sync,代码为:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = 2014338818796000944L; 3  4     FairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         for (;;) {10             if (hasQueuedPredecessors())11                 return -1;12             int available = getState();13             int remaining = available - acquires;14             if (remaining < 0 ||15                 compareAndSetState(available, remaining))16                 return remaining;17         }18     }19 }

首先第10行的hasQueuedPredecessors方法,前面已经说过了,如果已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示无法获取共享锁成功。

接着获取available,available就是state,用volatile修饰,所以线程中可以看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种情况直接返回:

  1. remaining减完<0

  2. 通过cas设置成功

之后就是和之前说过的共享锁的逻辑了,如果返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。

接着看一下非公平信号量的实现,NonfairSync继承Sync:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = -2694183684443567898L; 3  4     NonfairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         return nonfairTryAcquireShared(acquires);10     }11 }

nonfairTryAcquireShared在父类已经实现了,再贴一下代码:

1 final int nonfairTryAcquireShared(int acquires) {2     for (;;) {3         int available = getState();4         int remaining = available - acquires;5         if (remaining < 0 ||6             compareAndSetState(available, remaining))7             return remaining;8     }9 }

看到这里和公平Semaphore只有一点差别:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是不是自身都无所谓,对于非公平Semaphore都一样,反正线程调用Semaphore的acquire方法就将当前state-1,如果得到的remaining设置成功或者CAS操作成功就返回,这种操作没有遵循先到先得的原则,即非公平信号量。

至于非公平信号量对比公平信号量的优点,和ReentrantLock的非公平锁对比ReentrantLock的公平锁一样,就不说了。

 

CountDownLatch实现原理

CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger。

CountDownLatch是一种共享锁,通过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:

 1 public void await() throws InterruptedException { 2     sync.acquireSharedInterruptibly(1); 3 }

acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:

 1 private static final class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 4982264981922014374L; 3  4     Sync(int count) { 5         setState(count); 6     } 7  8     int getCount() { 9         return getState();10     }11 12     protected int tryAcquireShared(int acquires) {13         return (getState() == 0) ? 1 : -1;14     }15 16     protected boolean tryReleaseShared(int releases) {17         // Decrement count; signal when transition to zero18         for (;;) {19             int c = getState();20             if (c == 0)21                 return false;22             int nextc = c-1;23             if (compareAndSetState(c, nextc))24                 return nextc == 0;25         }26     }27 }

其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:

  1. 传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程

  2. 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程

为了验证(2),看一下上面Sync的tryReleaseShared方法就可以了,确实是这么实现的。

以上がAbstractQueuedSynchronizer に基づく同時実行クラスの詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。