>Java >java지도 시간 >Java의 CyclicBarrier 소스 코드 분석

Java의 CyclicBarrier 소스 코드 분석

WBOY
WBOY앞으로
2023-04-30 20:55:131385검색

    CyclicBarrier 소개

    CountDownLatch의 경우 다른 스레드는 League of Legends와 같은 게임 플레이어이고, 메인 스레드는 게임 시작을 제어하는 ​​스레드입니다. 모든 플레이어가 준비되기 전에는 메인 스레드가 대기 상태가 되어 게임을 시작할 수 없습니다. 모든 플레이어가 준비되면 다음 작업 실행자가 게임을 시작하는 기본 스레드가 됩니다.

    CyclicBarrier의 경우 회사에서 모든 직원이 팀 빌딩 활동을 수행하기를 원한다고 가정해 보겠습니다. 활동 내용은 사람마다 장애물을 극복하는 데 걸리는 시간이 다릅니다. 그러나 회사에서는 모든 사람이 다음 장애물을 넘어가기 전에 현재 장애물을 넘어야 한다고 요구합니다. 즉, 모든 사람이 첫 번째 장애물을 넘어간 후에는 두 번째 장애물을 넘어가기 시작하는 식입니다. 유추적으로 모든 직원은 "다른 스레드"입니다. 모두가 모든 장애물을 넘으면 프로그램이 종료됩니다. 메인 스레드는 오래 전에 종료되었을 수 있으므로 여기서는 메인 스레드에 대해 걱정할 필요가 없습니다.

    CyclicBarrier 소스 코드 분석

    클래스 상속 관계

    CyclicBarrier는 어떤 상위 클래스를 상속하는지, 어떤 상위 인터페이스를 구현하는지 표시하지 않습니다. 모든 AQS 및 재진입 잠금은 상속을 통해 구현되지 않고 조합을 통해 구현됩니다.

    public class CyclicBarrier {}
    ```  
    
    ### 类的内部类
    
    CyclicBarrier类存在一个内部类Generation,每一次使用的CycBarrier可以当成Generation的实例,其源代码如下
    
    ```java
    private static class Generation {
    boolean broken = false;
    }

    설명: Generation 클래스에는 broken 속성이 있으며, 이는 현재 장벽이 손상되었는지 여부를 나타내는 데 사용됩니다.

    class

    public class CyclicBarrier {
    
    /** The lock for guarding barrier entry */
    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    // 条件队列
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    // 参与的线程数量
    private final int parties;
    /* The command to run when tripped */
    // 由最后一个进入 barrier 的线程执行的操作
    private final Runnable barrierCommand;
    /** The current generation */
    // 当前代
    private Generation generation = new Generation();
    // 正在等待进入屏障的线程数量
    private int count;
    }

    속성: 이 속성에는 ReentrantLock 객체 하나와 Condition 객체 하나가 있으며, Condition 객체는 AQS를 기반으로 하므로 최종 분석에서 맨 아래 레이어는 여전히 AQS에서 지원됩니다.

    클래스

    CyclicBarrier(int, Runnable) 유형 생성자의 생성자

    public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参与的线程数量小于等于0,抛出异常
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置parties
    this.parties = parties;
    // 设置count
    this.count = parties;
    // 设置barrierCommand
    this.barrierCommand = barrierAction;
    }

    Description: 이 생성자는 CyclicBarrier와 연결된 스레드 수를 지정할 수 있으며 모든 스레드가 장벽에 들어간 후에 이를 지정할 수 있습니다. 실행 작업은 장벽을 수행하는 마지막 스레드에 의해 실행됩니다.

    CyclicBarrier(int) 유형 생성자

    public CyclicBarrier(int parties) {
    // 调用含有两个参数的构造函数
    this(parties, null);
    }

    설명: 이 생성자는 CyclicBarrier와 관련된 스레드 수만큼만 실행하고 실행 동작을 설정하지 않습니다.

    핵심 함수 - dowait 함수

    CyclicBarrier 클래스에서 제공하는 Wait 함수는 최하위 레이어에서 doawait 함수를 호출합니다.

    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    // 保存当前锁
    final ReentrantLock lock = this.lock;
    // 锁定
    lock.lock();
    try {
    // 保存当前代
    final Generation g = generation;
    
    if (g.broken) // 屏障被破坏,抛出异常
    throw new BrokenBarrierException();
    
    if (Thread.interrupted()) { // 线程被中断
    // 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
    breakBarrier();
    // 抛出异常
    throw new InterruptedException();
    }
    
    // 减少正在等待进入屏障的线程数量
    int index = --count;
    if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入
    // 运行的动作标识
    boolean ranAction = false;
    try {
    // 保存运行动作
    final Runnable command = barrierCommand;
    if (command != null) // 动作不为空
    // 运行
    command.run();
    // 设置ranAction状态
    ranAction = true;
    // 进入下一代
    nextGeneration();
    return 0;
    } finally {
    if (!ranAction) // 没有运行的动作
    // 损坏当前屏障
    breakBarrier();
    }
    }
    
    // loop until tripped, broken, interrupted, or timed out
    // 无限循环
    for (;;) {
    try {
    if (!timed) // 没有设置等待时间
    // 等待
    trip.await();
    else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
    // 等待指定时长
    nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
    // 损坏当前屏障
    breakBarrier();
    // 抛出异常
    throw ie;
    } else { // 不等于当前带后者是屏障被损坏
    // We&#39;re about to finish waiting even if we had not
    // been interrupted, so this interrupt is deemed to
    // "belong" to subsequent execution.
    // 中断当前线程
    Thread.currentThread().interrupt();
    }
    }
    
    if (g.broken) // 屏障被损坏,抛出异常
    throw new BrokenBarrierException();
    
    if (g != generation) // 不等于当前代
    // 返回索引
    return index;
    
    if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
    // 损坏屏障
    breakBarrier();
    // 抛出异常
    throw new TimeoutException();
    }
    }
    } finally {
    // 释放锁
    lock.unlock();
    }
    }
    핵심 함수 - nextGeneration 함수

    이 함수는 모든 스레드가 장벽에 진입한 후 호출됩니다. 즉, 다음 버전이 생성되고 모든 스레드가 장벽에 다시 진입할 수 있습니다.

    소스 코드는 다음과 같습니다.

    private void nextGeneration() {
    // signal completion of last generation
    // 唤醒所有线程
    trip.signalAll();
    // set up next generation
    // 恢复正在等待进入屏障的线程数量
    count = parties;
    // 新生一代
    generation = new Generation();
    }
    이 함수에서는 AQS의 signalAll 메서드를 호출하여 대기 중인 모든 스레드를 깨웁니다. 모든 스레드가 이 조건을 기다리고 있으면 모든 스레드를 깨웁니다.

    소스코드는 다음과 같습니다.

    public final void signalAll() {
    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
    throw new IllegalMonitorStateException();
    // 保存condition队列头节点
    Node first = firstWaiter;
    if (first != null) // 头节点不为空
    // 唤醒所有等待线程
    doSignalAll(first);
    }

    위 내용은 Java의 CyclicBarrier 소스 코드 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제