Home >Java >javaTutorial >CyclicBarrier source code analysis in Java
For CountDownLatch
, other threads are game players, such as League of Legends, and the main thread is the thread that controls the start of the game . Before all players are ready, the main thread is in a waiting state, which means the game cannot start. When all players are ready, the next action executor is the main thread, which starts the game.
For CyclicBarrier, suppose a company wants all employees to conduct a team-building activity. The activity content is to climb over three obstacles. The time it takes for each person to climb over the obstacles is different. However, the company requires everyone to climb over the current obstacle before starting to climb over the next obstacle. That is, after everyone climbs over the first obstacle, they start to climb over the second obstacle, and so on. Analogically, every employee is an "other thread". The program ends when everyone has climbed over all obstacles. The main thread may have ended long ago, so we don't need to worry about the main thread here.
CyclicBarrier does not show which parent class it inherits or which parent interface it implements. All AQS and reentrant locks are not implemented through inheritance. It is achieved through combination.
public class CyclicBarrier {} ``` ### 类的内部类 CyclicBarrier类存在一个内部类Generation,每一次使用的CycBarrier可以当成Generation的实例,其源代码如下 ```java private static class Generation { boolean broken = false; }
Description: The Generation class has an attribute broken, which is used to indicate whether the current barrier is damaged.
public class CyclicBarrier { /** The lock for guarding barrier entry */ // 可重入锁 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 条件队列 private final Condition trip = lock.newCondition(); /** The number of parties */ // 参与的线程数量 private final int parties; /* The command to run when tripped */ // 由最后一个进入 barrier 的线程执行的操作 private final Runnable barrierCommand; /** The current generation */ // 当前代 private Generation generation = new Generation(); // 正在等待进入屏障的线程数量 private int count; }
Description: This attribute has a ReentrantLock object and a Condition object, and the Condition object is based on AQS, so, in the final analysis, the bottom layer is still supported by AQS.
CyclicBarrier(int, Runnable) type constructor
public CyclicBarrier(int parties, Runnable barrierAction) { // 参与的线程数量小于等于0,抛出异常 if (parties <= 0) throw new IllegalArgumentException(); // 设置parties this.parties = parties; // 设置count this.count = parties; // 设置barrierCommand this.barrierCommand = barrierAction; }
Description: This constructor can specify the number of threads associated with the CyclicBarrier, and can specify the execution action after all threads enter the barrier. The execution action is executed by the last thread that performs the barrier.
CyclicBarrier(int) type constructor
public CyclicBarrier(int parties) { // 调用含有两个参数的构造函数 this(parties, null); }
Description: This constructor only executes the functions associated with the CyclicBarrier The number of threads, no execution action is set.
This function is the core function of the CyclicBarrier class. The await function provided by the CyclicBarrier class calls the doawait function at the bottom layer.
The source code is as follows:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 保存当前锁 final ReentrantLock lock = this.lock; // 锁定 lock.lock(); try { // 保存当前代 final Generation g = generation; if (g.broken) // 屏障被破坏,抛出异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { // 线程被中断 // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用 breakBarrier(); // 抛出异常 throw new InterruptedException(); } // 减少正在等待进入屏障的线程数量 int index = --count; if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入 // 运行的动作标识 boolean ranAction = false; try { // 保存运行动作 final Runnable command = barrierCommand; if (command != null) // 动作不为空 // 运行 command.run(); // 设置ranAction状态 ranAction = true; // 进入下一代 nextGeneration(); return 0; } finally { if (!ranAction) // 没有运行的动作 // 损坏当前屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 无限循环 for (;;) { try { if (!timed) // 没有设置等待时间 // 等待 trip.await(); else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0 // 等待指定时长 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏 // 损坏当前屏障 breakBarrier(); // 抛出异常 throw ie; } else { // 不等于当前带后者是屏障被损坏 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中断当前线程 Thread.currentThread().interrupt(); } } if (g.broken) // 屏障被损坏,抛出异常 throw new BrokenBarrierException(); if (g != generation) // 不等于当前代 // 返回索引 return index; if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0 // 损坏屏障 breakBarrier(); // 抛出异常 throw new TimeoutException(); } } } finally { // 释放锁 lock.unlock(); } }
This function will be called after all threads enter the barrier, that is, the next version is generated, all The thread can re-enter the barrier.
The source code is as follows:
private void nextGeneration() { // signal completion of last generation // 唤醒所有线程 trip.signalAll(); // set up next generation // 恢复正在等待进入屏障的线程数量 count = parties; // 新生一代 generation = new Generation(); }
In this function, the signalAll method of AQS will be called, which wakes up all waiting threads. If all threads are waiting for this condition, wake up all threads.
The source code is as follows:
public final void signalAll() { if (!isHeldExclusively()) // 不被当前线程独占,抛出异常 throw new IllegalMonitorStateException(); // 保存condition队列头节点 Node first = firstWaiter; if (first != null) // 头节点不为空 // 唤醒所有等待线程 doSignalAll(first); }
The above is the detailed content of CyclicBarrier source code analysis in Java. For more information, please follow other related articles on the PHP Chinese website!