CyclicBarrier는 말 그대로 루프 장벽(순환 장벽)을 의미합니다. 스레드 그룹이 특정 상태(장벽 지점)를 기다리게 한 다음 동시에 실행하도록 할 수 있습니다. 모든 대기 스레드가 해제된 후에 CyclicBarrier를 재사용할 수 있기 때문에 이를 루프백이라고 합니다.
CyclicBarrier의 기능은 스레드 그룹이 공통 지점에 도달하면 이전에 대기 중인 모든 스레드가 계속 실행되며 CyclicBarrier 기능을 재사용할 수 있습니다.
구성 방법:
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。 public CyclicBarrier(int parties) // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
중요 방법:
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞 // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循环 通过reset()方法可以进行重置
CyclicBarrier 사용 가능 데이터의 멀티스레드 계산을 위해 최종적으로 병합 계산 결과.
카운터를 재설정하고 장벽을 재사용할 수 있는 CyclicBarrier의 기능을 활용하여 "기차가 사람으로 가득 찼습니다"와 유사한 시나리오를 지원할 수 있습니다.
CyclicBarrier를 사용하여 계산 여러 스레드의 데이터, 마지막으로 계산 결과 병합 시나리오입니다.
public class CyclicBarrierTest2 { //保存每个学生的平均成绩 private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成绩为:"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //获取学生平均成绩 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同学的平均成绩为:"+score); try { //执行完运行await(),等待所有学生平均成绩都计算完毕 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); } }
카운터를 재설정하고 장벽을 재사용할 수 있는 CyclicBarrier의 기능을 사용하여 "기차는 사람으로 가득하다" 장면을 지원할 수 있습니다
public class CyclicBarrierTest3 { public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } }
출력 결과:
3 1번 선수가 위치에 있고, 공유할 준비가 되었습니다: 78ms 0
1명의 선수가 위치에 있고, 공유할 준비가 되었습니다: 395ms 1
5명의 선수가 위치에 있고, 공유할 준비가 되었습니다: 733ms 2
2명의 선수가 위치에 있고, 공유할 준비가 되었습니다: 776ms3
4번 선수가 위치에 있고, 공유할 준비가 되었습니다: 807ms4
심판: 게임이 시작되었습니다~~
4번 선수가 위치에 있고, 공유할 준비가 되었습니다: 131ms0
3번 선수가 위치에 있고, 공유할 준비가 되어 있음: 256ms1
2번 선수가 이미 자리에 있고, 공유할 준비가 되어 있음: 291ms2
1번 선수가 자리에 있고, 공유할 준비가 되어 있음: 588ms3
5번 선수가 자리에 있음 , 공유 준비 완료: 763ms4
심판자: 게임이 시작되었습니다~~
주요 프로세스는 다음과 같습니다.
잠금을 설정하고 카운트되면 차단에 들어갑니다! = 0;
차단에 들어가기 전에 먼저 조건 대기열에 들어간 다음 잠금을 해제하고 마지막으로 차단해야 합니다.
count != 0이면 깨우기가 수행되고 모든 노드가 조건 대기열은 차단 대기열로 변환됩니다.
잠금 획득에 실패하면 잠금이 성공적으로 획득되면 잠금을 획득합니다. 잠금이 해제되고 동기화 큐의 스레드가 깨어납니다.
다음은 일부 코드 호출의 구체적인 프로세스입니다.
몇 가지 일반적인 질문이 있으신가요?
2. 열 삭제 주기는 어떻게 구현되나요? 실제로 뮤텍스 ReentrantLock의 조건 대기열과 차단 대기열이 변환됩니다. java.util.concurrent.locks.Condition#signalAll
CyclicBarrier는 ReentrantLock의 "독점 잠금" 및 Conditon을 통해 스레드 그룹의 차단 및 깨우기를 구현하는 반면, CountDownLatch는 AQS의 "공유 잠금"
위 내용은 Java에서 CyclicBarrier 사이클 장벽을 적용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!