>  기사  >  Java  >  공유 모드 획득 구현 프로세스

공유 모드 획득 구현 프로세스

零下一度
零下一度원래의
2017-07-03 11:40:522785검색

공유 모드 획득 구현 프로세스

위에서 우리는 AbstractQueuedSynchronizer 독점 모드의 획득 구현 프로세스를 설명했습니다. 이 기사는 철이 뜨거워지는 동안 계속해서 AbstractQueuedSynchronizer 공유 모드 획득의 구현 프로세스를 살펴봅니다. 두 개의 연속된 기사를 공부한 후 독점 모드 획득과 공유 모드 획득의 차이점을 비교하여 AbstractQueuedSynchronizer에 대한 이해를 심화할 수도 있습니다.

먼저 공유 모드 획득의 구현을 살펴보겠습니다. 두 메서드 사이에는 큰 차이가 없습니다. 후자가 인터럽트 처리를 한다는 점입니다.

 1 public final void acquireShared(int arg) { 2     if (tryAcquireShared(arg) < 0) 3         doAcquireShared(arg); 4 }

여기에서 볼 수 있습니다. 첫 번째 차이점은 다음과 같습니다. 독점 모드에서 획득할 때 하위 클래스에 의해 재정의된 tryAcquire 메서드는 부울 값을 반환합니다. 즉, 공유 모드에서 획득할 때 tryAcquire가 성공했는지 여부를 int 변수로 반환합니다. < 0인지 확인하기 위해 반환됩니다. doAcquireShared 메소드의 구현은 다음과 같습니다.

 1 private void doAcquireShared(int arg) { 2     final Node node = addWaiter(Node.SHARED); 3     boolean failed = true; 4     try { 5         boolean interrupted = false; 6         for (;;) { 7             final Node p = node.predecessor(); 8             if (p == head) { 9                 int r = tryAcquireShared(arg);10                 if (r >= 0) {11                     setHeadAndPropagate(node, r);12                     p.next = null; // help GC13                     if (interrupted)14                         selfInterrupt();15                     failed = false;16                     return;17                 }18             }19             if (shouldParkAfterFailedAcquire(p, node) &&20                 parkAndCheckInterrupt())21                 interrupted = true;22         }23     } finally {24         if (failed)25             cancelAcquire(node);26     }27 }

이 코드가 수행하는 작업을 분석해 보겠습니다.

  1. addWaiter, tryAcquireShared

  2. 현재 노드의 이전 노드를 가져옵니다. 이전 노드가 헤드인 노드만 AcquireShared를 시도할 수 있습니다. 이는 배타적 잠금과 동일합니다.

  3. 전임 노드가 헤드가 아니므로 실행합니다. shouldParkAfterFailedAcquire() && parkAndCheckInterrupt ()", for(;;) 루프에서 "shouldParkAfterFailedAcquire()" 메서드가 두 번 실행되어 현재 스레드가 차단됩니다. 이는 배타적 잠금

실제로 획득하는 것과 같습니다. 공유 모드와 독점 모드의 획득이 더 큽니다. 논리의 일부는 유사합니다. 가장 큰 차이점은 tryAcquireShared 성공 후 독점 모드의 획득이 현재 노드를 헤드 노드로 직접 설정한다는 것입니다. . 이름에서 알 수 있듯이 헤드 를 설정한 후 추가 전파 작업을 수행합니다. setHeadAndPropagate 메소드의 소스 코드는 다음과 같습니다.

 1 private void setHeadAndPropagate(Node node, int propagate) { 2     Node h = head; // Record old head for check below 3     setHead(node); 4     /* 5      * Try to signal next queued node if: 6      *   Propagation was indicated by caller, 7      *     or was recorded (as h.waitStatus) by a previous operation 8      *     (note: this uses sign-check of waitStatus because 9      *      PROPAGATE status may transition to SIGNAL.)10      * and11      *   The next node is waiting in shared mode,12      *     or we don't know, because it appears null13      *14      * The conservatism in both of these checks may cause15      * unnecessary wake-ups, but only when there are multiple16      * racing acquires/releases, so most need signals now or soon17      * anyway.18      */19     if (propagate > 0 || h == null || h.waitStatus < 0) {20         Node s = node.next;21         if (s == null || s.isShared())22             doReleaseShared();23     }24 }

세 번째 줄의 코드는 재설정 헤드를 설정하고, 두 번째 줄의 코드는 헤드를 재설정해야 하므로 먼저 다음을 정의합니다. 원래 헤드의 주소를 얻기 위한 노드 유형 변수 h, 이 두 줄의 코드는 매우 간단합니다.

19~23번째 줄의 코드는 배타적 잠금과 공유 잠금 사이에서 가장 다른 곳입니다. 배타적 잠금 획득 Queued 코드를 살펴보겠습니다.

 1 final boolean acquireQueued(final Node node, int arg) { 2     boolean failed = true; 3     try { 4         boolean interrupted = false; 5         for (;;) { 6             final Node p = node.predecessor(); 7             if (p == head && tryAcquire(arg)) { 8                 setHead(node); 9                 p.next = null; // help GC10                 failed = false;11                 return interrupted;12             }13             if (shouldParkAfterFailedAcquire(p, node) &&14                 parkAndCheckInterrupt())15                 interrupted = true;16         }17     } finally {18         if (failed)19             cancelAcquire(node);20     }21 }

이것은 배타적 잠금을 의미합니다. 노드가 헤드로 설정된 후 후속 노드가 SHARED 상태이면 계속해서 doReleaseShared 메소드를 전달합니다. 공유 상태의 역방향 전파를 달성하려면 나중에 노드 점을 올리세요.

공유 모드 릴리스 구현 프로세스

위에서는 릴리스 구현 프로세스를 살펴보겠습니다.

1 public final boolean releaseShared(int arg) {2     if (tryReleaseShared(arg)) {3         doReleaseShared();4         return true;5     }6     return false;7 }

tryReleaseShared 메서드 하위 클래스에 의해 구현됩니다. tryReleaseShared가 성공하면 doReleaseShared() 메서드를 실행합니다.

 1 private void doReleaseShared() { 2     /* 3      * Ensure that a release propagates, even if there are other 4      * in-progress acquires/releases.  This proceeds in the usual 5      * way of trying to unparkSuccessor of head if it needs 6      * signal. But if it does not, status is set to PROPAGATE to 7      * ensure that upon release, propagation continues. 8      * Additionally, we must loop in case a new node is added 9      * while we are doing this. Also, unlike other uses of10      * unparkSuccessor, we need to know if CAS to reset status11      * fails, if so rechecking.12      */13     for (;;) {14         Node h = head;15         if (h != null && h != tail) {16             int ws = h.waitStatus;17             if (ws == Node.SIGNAL) {18                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))19                     continue;            // loop to recheck cases20                 unparkSuccessor(h);21             }22             else if (ws == 0 &&23                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))24                 continue;                // loop on failed CAS25         }26         if (h == head)                   // loop if head changed27             break;28     }29 }

主要是两层逻辑:

  1. 头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点

  2. 头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播

Condition的await()方法实现原理----构建等待队列

我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。

Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:

1 public class ConditionObject implements Condition, java.io.Serializable {2         private static final long serialVersionUID = 1173984872572414699L;3         /** First node of condition queue. */4         private transient Node firstWaiter;5         /** Last node of condition queue. */6         private transient Node lastWaiter;7         8         ...9 }

这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点

像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }

第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:

 1 private Node addConditionWaiter() { 2     Node t = lastWaiter; 3     // If lastWaiter is cancelled, clean out. 4     if (t != null && t.waitStatus != Node.CONDITION) { 5         unlinkCancelledWaiters(); 6         t = lastWaiter; 7     } 8     Node node = new Node(Thread.currentThread(), Node.CONDITION); 9     if (t == null)10         firstWaiter = node;11     else12         t.nextWaiter = node;13     lastWaiter = node;14     return node;15 }

首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的

那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:

  1. 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node

  2. 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node

  3. 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面

用一张图表示一下构建的数据结构就是:

对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:

  1. AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点

  2. AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址

整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么

Condition的await()方法实现原理----线程等待

前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }

构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:

 1 final int fullyRelease(Node node) { 2     boolean failed = true; 3     try { 4         int savedState = getState(); 5         if (release(savedState)) { 6             failed = false; 7             return savedState; 8         } else { 9             throw new IllegalMonitorStateException();10         }11     } finally {12         if (failed)13             node.waitStatus = Node.CANCELLED;14     }15 }

这里第4行获取state,第5行release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了,前面的文章详细讲解过。

接着看await()方法的第7行判断"while(!isOnSyncQueue(node))":

 1 final boolean isOnSyncQueue(Node node) { 2     if (node.waitStatus == Node.CONDITION || node.prev == null) 3         return false; 4     if (node.next != null) // If has successor, it must be on queue 5         return true; 6     /* 7      * node.prev can be non-null, but not yet on queue because 8      * the CAS to place it on queue can fail. So we have to 9      * traverse from tail to make sure it actually made it.  It10      * will always be near the tail in calls to this method, and11      * unless the CAS failed (which is unlikely), it will be12      * there, so we hardly ever traverse much.13      */14     return findNodeFromTail(node);15 }

注意这里的判断是Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

至此调用await()方法的线程构建Condition等待队列--释放锁--等待的过程已经全部分析完毕。

Condition的signal()实现原理

上面的代码分析了构建Condition等待队列--释放锁--等待的过程,接着看一下signal()方法通知是如何实现的:

1 public final void signal() {2     if (!isHeldExclusively())3         throw new IllegalMonitorStateException();4     Node first = firstWaiter;5     if (first != null)6         doSignal(first);7 }

首先从第2行的代码我们看到,要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。

那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

1 private void doSignal(Node first) {2     do {3         if ( (firstWaiter = first.nextWaiter) == null)4             lastWaiter = null;5         first.nextWaiter = null;6     } while (!transferForSignal(first) &&7              (first = firstWaiter) != null);8 }

第3行~第5行的代码很好理解:

  1. 重新设置firstWaiter,指向第一个waiter的nextWaiter

  2. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空

  3. 因为firstWaiter是要被signal的,因此它没什么用了,nextWaiter置空

接着执行第6行和第7行的代码,这里重点就是第6行的transferForSignal方法:

 1 final boolean transferForSignal(Node node) { 2     /* 3      * If cannot change waitStatus, the node has been cancelled. 4      */ 5     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 6         return false; 7  8     /* 9      * Splice onto queue and try to set waitStatus of predecessor to10      * indicate that thread is (probably) waiting. If cancelled or11      * attempt to set waitStatus fails, wake up to resync (in which12      * case the waitStatus can be transiently and harmlessly wrong).13      */14     Node p = enq(node);15     int ws = p.waitStatus;16     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))17         LockSupport.unpark(node.thread);18     return true;19 }

方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,总结一下方法的实现:

  1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false

  2. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列

  3. 当前节点通过CAS机制将waitStatus置为SIGNAL

最后上面的步骤全部成功,返回true,返回true唤醒等待节点成功。从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它

代码分析到这里,我想类似的signalAll方法也没有必要再分析了,显然signalAll方法的作用就是将所有Condition队列中等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。

 

代码示例

可能大家看了我分析半天代码会有点迷糊,这里最后我贴一段我用于验证上面Condition结论的示例代码,首先建立一个Thread,我将之命名为ConditionThread:

 1 /** 2  * @author 五月的仓颉 3  */ 4 public class ConditionThread implements Runnable { 5  6     private Lock lock; 7      8     private Condition condition; 9     10     public ConditionThread(Lock lock, Condition condition) {11         this.lock = lock;12         this.condition = condition;13     }14     15     @Override16     public void run() {17         18         if ("线程0".equals(JdkUtil.getThreadName())) {19             thread0Process();20         } else if ("线程1".equals(JdkUtil.getThreadName())) {21             thread1Process();22         } else if ("线程2".equals(JdkUtil.getThreadName())) {23             thread2Process();24         }25         26     }27     28     private void thread0Process() {29         try {30             lock.lock();31             System.out.println("线程0休息5秒");32             JdkUtil.sleep(5000);33             condition.signal();34             System.out.println("线程0唤醒等待线程");35         } finally {36             lock.unlock();37         }38     }39     40     private void thread1Process() {41         try {42             lock.lock();43             System.out.println("线程1阻塞");44             condition.await();45             System.out.println("线程1被唤醒");46         } catch (InterruptedException e) {47             48         } finally {49             lock.unlock();50         }51     }52     53     private void thread2Process() {54         try {55             System.out.println("线程2想要获取锁");56             lock.lock();57             System.out.println("线程2获取锁成功");58         } finally {59             lock.unlock();60         }61     }62     63 }

这个类里面的方法就不解释了,反正就三个方法片段,根据线程名判断,每个线层执行的是其中的一个代码片段。写一段测试代码:

 1 /** 2  * @author 五月的仓颉 3  */ 4 @Test 5 public void testCondition() throws Exception { 6     Lock lock = new ReentrantLock(); 7     Condition condition = lock.newCondition(); 8          9     // 线程0的作用是signal10     Runnable runnable0 = new ConditionThread(lock, condition);11     Thread thread0 = new Thread(runnable0);12     thread0.setName("线程0");13     // 线程1的作用是await14     Runnable runnable1 = new ConditionThread(lock, condition);15     Thread thread1 = new Thread(runnable1);16     thread1.setName("线程1");17     // 线程2的作用是lock18     Runnable runnable2 = new ConditionThread(lock, condition);19     Thread thread2 = new Thread(runnable2);20     thread2.setName("线程2");21         22     thread1.start();23     Thread.sleep(1000);24     thread0.start();25     Thread.sleep(1000);26     thread2.start();27         28     thread1.join();29 }

测试代码的意思是:

  1. 线程1先启动,获取锁,调用await()方法等待

  2. 线程0后启动,获取锁,休眠5秒准备signal()

  3. 线程2最后启动,获取锁,由于线程0未使用完毕锁,因此线程2排队,可以此时由于线程0还未signal(),因此线程1在线程0执行signal()后,在AbstractQueuedSynchronizer队列中的顺序是在线程2后面的

代码执行结果为:

<span style="color: #008080"> 1</span> <span style="color: #000000">线程1阻塞</span><span style="color: #008080"> 2</span> <span style="color: #000000">线程0休息5秒</span><span style="color: #008080"> 3</span> <span style="color: #000000">线程2想要获取锁</span><span style="color: #008080"> 4</span> <span style="color: #000000">线程0唤醒等待线程</span><span style="color: #008080"> 5</span> <span style="color: #000000">线程2获取锁成功</span><span style="color: #008080"> 6</span> <span style="color: #000000">线程1被唤醒</span><span style="color: #008080"><br></span>

符合我们的结论:signal()并不意味着被唤醒的线程立即执行。由于线程2先于线程0排队,因此看到第5行打印的内容,线程2先获取锁。

위 내용은 공유 모드 획득 구현 프로세스의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.