Heim >Java >javaLernprogramm >Implementierungsprozess für den Shared-Mode-Akquise
Acquire-Implementierungsprozess im Shared-Modus
Oben haben wir den Acquire-Implementierungsprozess des exklusiven AbstractQueuedSynchronizer-Modus erläutert Betrachten Sie weiterhin den gemeinsam genutzten AbstraktQueuedSynchronizer-Implementierungsprozess. Nachdem Sie zwei aufeinanderfolgende Artikel studiert haben, können Sie auch den Unterschied zwischen der Erfassung im exklusiven Modus und der Erfassung im gemeinsam genutzten Modus vergleichen, um Ihr Verständnis von AbstractQueuedSynchronizer zu vertiefen.
Sehen wir uns zunächst die Implementierung des Shared-Modus-Acquire an. Es gibt keinen großen Unterschied zwischen den beiden acquireShared als Beispiel:
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 }
Hier sieht man den ersten Unterschied: Beim Erwerb im exklusiven Modus wird die Methode tryAcquire überschrieben von der Unterklasse zurückgegeben Es ist ein boolescher Wert, das heißt, ob tryAcquire beim Erwerb im gemeinsam genutzten Modus erfolgreich ist, wird eine int-Variable zurückgegeben, um zu bestimmen, ob sie <0 ist. Die Implementierung der doAcquireShared-Methode lautet:
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED); 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 final Node p = node.predecessor(); 8 if (p == head) { 9 int r = tryAcquireShared(arg);10 if (r >= 0) {11 setHeadAndPropagate(node, r);12 p.next = null; // help GC13 if (interrupted)14 selfInterrupt();15 failed = false;16 return;17 }18 }19 if (shouldParkAfterFailedAcquire(p, node) &&20 parkAndCheckInterrupt())21 interrupted = true;22 }23 } finally {24 if (failed)25 cancelAcquire(node);26 }27 }
Lassen Sie uns analysieren, was dieser Code bewirkt:
addWaiter, addWaiter Alle tryAcquireShared
Ruft den Vorgängerknoten des aktuellen Knotens ab, nur der Vorgängerknoten ist der Kopf Der Knoten kann tryAcquireShared verwenden, was mit der exklusiven Sperre identisch ist
Der Vorgängerknoten ist nicht der Kopf, führen Sie die for(;;)-Schleife „shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()“ aus , „shouldParkAfterFailedAcquire( )“-Methode wird zweimal ausgeführt und der aktuelle Thread wird blockiert. Dies ist dasselbe wie eine exklusive Sperre
In der Tat ist der Großteil der Logik des Erwerbs im gemeinsam genutzten Modus und des Erwerbs Im exklusiven Modus sind sie ähnlich. Der größte Unterschied besteht darin, dass nach erfolgreichem tryAcquireShared der aktuelle Knoten direkt als Hauptknoten festgelegt werden kann, während im gemeinsam genutzten Modus die setHeadAndPropagate-Methode ausgeführt wird führt nach dem Einstellen des Kopfes einen weiteren Ausbreitungsvorgang aus. Der Quellcode der setHeadAndPropagate-Methode lautet:
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; // Record old head for check below 3 setHead(node); 4 /* 5 * Try to signal next queued node if: 6 * Propagation was indicated by caller, 7 * or was recorded (as h.waitStatus) by a previous operation 8 * (note: this uses sign-check of waitStatus because 9 * PROPAGATE status may transition to SIGNAL.)10 * and11 * The next node is waiting in shared mode,12 * or we don't know, because it appears null13 *14 * The conservatism in both of these checks may cause15 * unnecessary wake-ups, but only when there are multiple16 * racing acquires/releases, so most need signals now or soon17 * anyway.18 */19 if (propagate > 0 || h == null || h.waitStatus < 0) {20 Node s = node.next;21 if (s == null || s.isShared())22 doReleaseShared();23 }24 }
Der Code in der 3. Zeile setzt den Kopf zurück, und der Code in der 2. Zeile setzt den Kopf zurück, weil der Code In der dritten Zeile wird der Kopf zurückgesetzt. Definieren Sie zunächst eine Knotenvariable h, um die Adresse des ursprünglichen Kopfes zu erhalten.
Der Code in den Zeilen 19 bis 23 ist der stärkste Unterschied zwischen exklusiver Sperre und gemeinsamer Sperre. Schauen wir uns den Code der exklusiven Sperre „acquireQueued“ an:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC10 failed = false;11 return interrupted;12 }13 if (shouldParkAfterFailedAcquire(p, node) &&14 parkAndCheckInterrupt())15 interrupted = true;16 }17 } finally {18 if (failed)19 cancelAcquire(node);20 }21 }
Dies bedeutet, dass nachdem ein exklusiver Sperrknoten aktiviert wurde, dieser Knoten nur noch als Kopf festgelegt werden muss und fertig ist, geteilte Sperren jedoch anders sind Ein Knoten wird als Kopf festgelegt. Wenn sich sein Nachfolgerknoten im SHARED-Zustand befindet, versucht er weiterhin, den Knoten über die doReleaseShared-Methode aufzuwecken und realisiert so die Rückwärtsausbreitung des gemeinsam genutzten Zustands .
Implementierungsprozess der Shared-Mode-Release
Oben wird beschrieben, wie im Shared-Modus erfasst wird Schauen wir uns den Release-Implementierungsprozess an:
1 public final boolean releaseShared(int arg) {2 if (tryReleaseShared(arg)) {3 doReleaseShared();4 return true;5 }6 return false;7 }
1 private void doReleaseShared() { 2 /* 3 * Ensure that a release propagates, even if there are other 4 * in-progress acquires/releases. This proceeds in the usual 5 * way of trying to unparkSuccessor of head if it needs 6 * signal. But if it does not, status is set to PROPAGATE to 7 * ensure that upon release, propagation continues. 8 * Additionally, we must loop in case a new node is added 9 * while we are doing this. Also, unlike other uses of10 * unparkSuccessor, we need to know if CAS to reset status11 * fails, if so rechecking.12 */13 for (;;) {14 Node h = head;15 if (h != null && h != tail) {16 int ws = h.waitStatus;17 if (ws == Node.SIGNAL) {18 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))19 continue; // loop to recheck cases20 unparkSuccessor(h);21 }22 else if (ws == 0 &&23 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))24 continue; // loop on failed CAS25 }26 if (h == head) // loop if head changed27 break;28 }29 }
主要是两层逻辑:
头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点
头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播
Condition的await()方法实现原理----构建等待队列
我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。
Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:
1 public class ConditionObject implements Condition, java.io.Serializable {2 private static final long serialVersionUID = 1173984872572414699L;3 /** First node of condition queue. */4 private transient Node firstWaiter;5 /** Last node of condition queue. */6 private transient Node lastWaiter;7 8 ...9 }
这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点。
像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:
1 public final void await() throws InterruptedException { 2 if (Thread.interrupted()) 3 throw new InterruptedException(); 4 Node node = addConditionWaiter(); 5 int savedState = fullyRelease(node); 6 int interruptMode = 0; 7 while (!isOnSyncQueue(node)) { 8 LockSupport.park(this); 9 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10 break;11 }12 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13 interruptMode = REINTERRUPT;14 if (node.nextWaiter != null) // clean up if cancelled15 unlinkCancelledWaiters();16 if (interruptMode != 0)17 reportInterruptAfterWait(interruptMode);18 }
第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:
1 private Node addConditionWaiter() { 2 Node t = lastWaiter; 3 // If lastWaiter is cancelled, clean out. 4 if (t != null && t.waitStatus != Node.CONDITION) { 5 unlinkCancelledWaiters(); 6 t = lastWaiter; 7 } 8 Node node = new Node(Thread.currentThread(), Node.CONDITION); 9 if (t == null)10 firstWaiter = node;11 else12 t.nextWaiter = node;13 lastWaiter = node;14 return node;15 }
首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的。
那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。
接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:
如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node
如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node
无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面
用一张图表示一下构建的数据结构就是:
对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:
AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点
AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址
整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么。
Condition的await()方法实现原理----线程等待
前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:
1 public final void await() throws InterruptedException { 2 if (Thread.interrupted()) 3 throw new InterruptedException(); 4 Node node = addConditionWaiter(); 5 int savedState = fullyRelease(node); 6 int interruptMode = 0; 7 while (!isOnSyncQueue(node)) { 8 LockSupport.park(this); 9 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10 break;11 }12 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13 interruptMode = REINTERRUPT;14 if (node.nextWaiter != null) // clean up if cancelled15 unlinkCancelledWaiters();16 if (interruptMode != 0)17 reportInterruptAfterWait(interruptMode);18 }
构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:
1 final int fullyRelease(Node node) { 2 boolean failed = true; 3 try { 4 int savedState = getState(); 5 if (release(savedState)) { 6 failed = false; 7 return savedState; 8 } else { 9 throw new IllegalMonitorStateException();10 }11 } finally {12 if (failed)13 node.waitStatus = Node.CANCELLED;14 }15 }
这里第4行获取state,第5行release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了,前面的文章详细讲解过。
接着看await()方法的第7行判断"while(!isOnSyncQueue(node))":
1 final boolean isOnSyncQueue(Node node) { 2 if (node.waitStatus == Node.CONDITION || node.prev == null) 3 return false; 4 if (node.next != null) // If has successor, it must be on queue 5 return true; 6 /* 7 * node.prev can be non-null, but not yet on queue because 8 * the CAS to place it on queue can fail. So we have to 9 * traverse from tail to make sure it actually made it. It10 * will always be near the tail in calls to this method, and11 * unless the CAS failed (which is unlikely), it will be12 * there, so we hardly ever traverse much.13 */14 return findNodeFromTail(node);15 }
注意这里的判断是Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。
至此调用await()方法的线程构建Condition等待队列--释放锁--等待的过程已经全部分析完毕。
Condition的signal()实现原理
上面的代码分析了构建Condition等待队列--释放锁--等待的过程,接着看一下signal()方法通知是如何实现的:
1 public final void signal() {2 if (!isHeldExclusively())3 throw new IllegalMonitorStateException();4 Node first = firstWaiter;5 if (first != null)6 doSignal(first);7 }
首先从第2行的代码我们看到,要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。
那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:
1 private void doSignal(Node first) {2 do {3 if ( (firstWaiter = first.nextWaiter) == null)4 lastWaiter = null;5 first.nextWaiter = null;6 } while (!transferForSignal(first) &&7 (first = firstWaiter) != null);8 }
第3行~第5行的代码很好理解:
重新设置firstWaiter,指向第一个waiter的nextWaiter
如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
因为firstWaiter是要被signal的,因此它没什么用了,nextWaiter置空
接着执行第6行和第7行的代码,这里重点就是第6行的transferForSignal方法:
1 final boolean transferForSignal(Node node) { 2 /* 3 * If cannot change waitStatus, the node has been cancelled. 4 */ 5 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 6 return false; 7 8 /* 9 * Splice onto queue and try to set waitStatus of predecessor to10 * indicate that thread is (probably) waiting. If cancelled or11 * attempt to set waitStatus fails, wake up to resync (in which12 * case the waitStatus can be transiently and harmlessly wrong).13 */14 Node p = enq(node);15 int ws = p.waitStatus;16 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))17 LockSupport.unpark(node.thread);18 return true;19 }
方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,总结一下方法的实现:
尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
当前节点通过CAS机制将waitStatus置为SIGNAL
最后上面的步骤全部成功,返回true,返回true唤醒等待节点成功。从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它。
代码分析到这里,我想类似的signalAll方法也没有必要再分析了,显然signalAll方法的作用就是将所有Condition队列中等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。
代码示例
可能大家看了我分析半天代码会有点迷糊,这里最后我贴一段我用于验证上面Condition结论的示例代码,首先建立一个Thread,我将之命名为ConditionThread:
1 /** 2 * @author 五月的仓颉 3 */ 4 public class ConditionThread implements Runnable { 5 6 private Lock lock; 7 8 private Condition condition; 9 10 public ConditionThread(Lock lock, Condition condition) {11 this.lock = lock;12 this.condition = condition;13 }14 15 @Override16 public void run() {17 18 if ("线程0".equals(JdkUtil.getThreadName())) {19 thread0Process();20 } else if ("线程1".equals(JdkUtil.getThreadName())) {21 thread1Process();22 } else if ("线程2".equals(JdkUtil.getThreadName())) {23 thread2Process();24 }25 26 }27 28 private void thread0Process() {29 try {30 lock.lock();31 System.out.println("线程0休息5秒");32 JdkUtil.sleep(5000);33 condition.signal();34 System.out.println("线程0唤醒等待线程");35 } finally {36 lock.unlock();37 }38 }39 40 private void thread1Process() {41 try {42 lock.lock();43 System.out.println("线程1阻塞");44 condition.await();45 System.out.println("线程1被唤醒");46 } catch (InterruptedException e) {47 48 } finally {49 lock.unlock();50 }51 }52 53 private void thread2Process() {54 try {55 System.out.println("线程2想要获取锁");56 lock.lock();57 System.out.println("线程2获取锁成功");58 } finally {59 lock.unlock();60 }61 }62 63 }
这个类里面的方法就不解释了,反正就三个方法片段,根据线程名判断,每个线层执行的是其中的一个代码片段。写一段测试代码:
1 /** 2 * @author 五月的仓颉 3 */ 4 @Test 5 public void testCondition() throws Exception { 6 Lock lock = new ReentrantLock(); 7 Condition condition = lock.newCondition(); 8 9 // 线程0的作用是signal10 Runnable runnable0 = new ConditionThread(lock, condition);11 Thread thread0 = new Thread(runnable0);12 thread0.setName("线程0");13 // 线程1的作用是await14 Runnable runnable1 = new ConditionThread(lock, condition);15 Thread thread1 = new Thread(runnable1);16 thread1.setName("线程1");17 // 线程2的作用是lock18 Runnable runnable2 = new ConditionThread(lock, condition);19 Thread thread2 = new Thread(runnable2);20 thread2.setName("线程2");21 22 thread1.start();23 Thread.sleep(1000);24 thread0.start();25 Thread.sleep(1000);26 thread2.start();27 28 thread1.join();29 }
测试代码的意思是:
线程1先启动,获取锁,调用await()方法等待
线程0后启动,获取锁,休眠5秒准备signal()
线程2最后启动,获取锁,由于线程0未使用完毕锁,因此线程2排队,可以此时由于线程0还未signal(),因此线程1在线程0执行signal()后,在AbstractQueuedSynchronizer队列中的顺序是在线程2后面的
代码执行结果为:
<span style="color: #008080"> 1</span> <span style="color: #000000">线程1阻塞</span><span style="color: #008080"> 2</span> <span style="color: #000000">线程0休息5秒</span><span style="color: #008080"> 3</span> <span style="color: #000000">线程2想要获取锁</span><span style="color: #008080"> 4</span> <span style="color: #000000">线程0唤醒等待线程</span><span style="color: #008080"> 5</span> <span style="color: #000000">线程2获取锁成功</span><span style="color: #008080"> 6</span> <span style="color: #000000">线程1被唤醒</span><span style="color: #008080"><br></span>
符合我们的结论:signal()并不意味着被唤醒的线程立即执行。由于线程2先于线程0排队,因此看到第5行打印的内容,线程2先获取锁。
Das obige ist der detaillierte Inhalt vonImplementierungsprozess für den Shared-Mode-Akquise. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!