ホームページ  >  記事  >  Java  >  JavaでCyclicBarrierサイクルバリアを適用する方法

JavaでCyclicBarrierサイクルバリアを適用する方法

WBOY
WBOY転載
2023-05-12 14:19:181086ブラウズ

1. はじめに

CyclicBarrier は文字通りループ バリア (循環バリア) を意味し、スレッドのグループを特定の状態 (バリア ポイント) で待機させ、すべてを同時に実行できます。これは、待機中のスレッドがすべて解放された後に CyclicBarrier を再利用できるため、ループバックと呼ばれます。

JavaでCyclicBarrierサイクルバリアを適用する方法

CyclicBarrier の機能は、スレッドのグループを相互に待機させることです。共通点に達すると、それまで待機していたすべてのスレッドが実行を継続し、CyclicBarrier関数は再利用できます。

#2. CyclicBarrier の使用

構築方法:

 // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)

重要な方法:

//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环  通过reset()方法可以进行重置

CyclicBarrier アプリケーション シナリオ

  • CyclicBarrier の使用は、マルチスレッド データが計算され、計算結果が最終的にマージされるシナリオで使用できます。

  • #CyclicBarrier のカウンターリセットとバリア再利用の特徴を利用し、「完全出発」に近いシナリオにも対応可能です。 ##シミュレーション マージ計算シナリオ
  • CyclicBarrier を使用すると、複数のスレッドでデータを計算し、最終的に計算結果をマージすることができます。

    public class CyclicBarrierTest2 {
        //保存每个学生的平均成绩
        private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
        private ExecutorService threadPool= Executors.newFixedThreadPool(3);
        private CyclicBarrier cb=new CyclicBarrier(3,()->{
            int result=0;
            Set<String> set = map.keySet();
            for(String s:set){
                result+=map.get(s);
            }
            System.out.println("三人平均成绩为:"+(result/3)+"分");
        });
        public void count(){
            for(int i=0;i<3;i++){
                threadPool.execute(new Runnable(){
    
                    @Override
                    public void run() {
                        //获取学生平均成绩
                        int score=(int)(Math.random()*40+60);
                        map.put(Thread.currentThread().getName(), score);
                        System.out.println(Thread.currentThread().getName()
                                +"同学的平均成绩为:"+score);
                        try {
                            //执行完运行await(),等待所有学生平均成绩都计算完毕
                            cb.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
    
                });
            }
        }
        public static void main(String[] args) {
            CyclicBarrierTest2 cb=new CyclicBarrierTest2();
            cb.count();
        }
    }
  • 「バスが満員」のシーンをシミュレート

CyclicBarrierのカウンターのリセットとバリアの再利用ができる特性を利用し、「バスが満員」のようなシーンに対応できます。人がいっぱいです"

public class CyclicBarrierTest3 {
    public static void main(String[] args) {
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 5, 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
                new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("裁判:比赛开始~~"));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }

    }
    static class Runner extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Runner (CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }

}

出力結果:

プレーヤー No. 3 が配置され、共有する準備ができました: 78ms0

プレーヤー No. 1配置され、共有する準備ができています: 395ms1## プレーヤー No. 5 が配置され、共有する準備ができています: 733ms2 プレーヤー No. 2 が配置され、共有する準備ができています: 776ms プレーヤー No. 3 が配置され、共有する準備ができています共有: 807ms4 主審: ゲームが開始されました ~~

4 人のプレーヤーが配置され、共有する準備ができました: 131ms0
3 人のプレーヤーが配置され、共有する準備ができました: 256ms1

2 人のプレーヤーが配置されます、共有準備完了: 291ms2
プレイヤー No. 1 が配置され、共有準備完了: 588ms3
プレイヤー No. 5 が配置され、共有準備完了: 763ms4
主審: ゲーム開始~~


3. CyclicBarrier のソース コード分析

CyclicBarrier プロセス


#メイン プロセスは次のとおりです:

ロックを取得し、カウント != 0 の場合はブロッキングに入ります;

ブロッキングに入る前に、まず条件キューに入り、次にロックを解放し、最後にブロックする必要があります;

  • カウント != 0 の場合、ウェイクアップが実行されます。条件キュー内のすべてのノードがブロッキング キューに変換されます。

  • ## ウェイクアップ後、ロックが取得されます。ロックの取得が失敗した場合は、ロック ブロック キューに入ります。

  • ロックの取得が成功した場合、ロックは解放され、スレッドは同期状態になります。キューが目覚めます。

  • 以下は簡単なフローチャートです:

以下はいくつかの具体的なコードです電話をかけるプロセス:

JavaでCyclicBarrierサイクルバリアを適用する方法#よくある質問がいくつかありますか?

1. スレッドのグループは、バリアをトリガーする前に互いに待機します。最後のスレッドがバリアに到達した後、ウェイクアップ ロジックはどのように実装されますか。ウェイクアップ プロセスは次のとおりです。呼び出し

java.util.concurrent.locks.Condition#signalAll

条件キュー上のすべてのノードを起動します。 JavaでCyclicBarrierサイクルバリアを適用する方法

2. 列削除サイクルはどのように実装されますか? 実際には、ミューテックス ReentrantLock の条件キューとブロッキング キューが変換されます。
  • 3. 条件キューから同期キューへの変換の実装ロジック? 変換プロセス中、まず条件キュー内のブロックされているすべてのスレッドが起動され、次にロックが取得されます。取得に失敗した場合は、同期キューに入ります。

  • CyclicBarrier と CountDownLatch の違い

  • CountDownLatch のカウンターは 1 回しか使用できませんが、CyclicBarrier のカウンターはリセット()メソッド。したがって、CyclicBarrier は、より複雑なビジネス シナリオを処理できます。たとえば、計算エラーが発生した場合、カウンターをリセットしてスレッドを再実行できます。

CyclicBarrier は getNumberWaiting (これは、スレッドの数をブロックする CyclicBarrier を取得できます)、isBroken (ブロックされたスレッドが中断されたかどうかを知るために使用されます) およびその他のメソッドを取得できます。

  • CountDownLatch はメイン スレッドをブロックしますが、CyclicBarrier はメイン スレッドをブロックせず、子スレッドのみをブロックします。

  • CountDownLatch と CyclicBarrier はどちらもスレッド間の待機を実現できますが、焦点が異なります。 CountDownLatch は通常、1 つ以上のスレッドが他のスレッドがタスクを実行する前に完了するのを待つために使用されます。 CyclicBarrier は通常、スレッドのグループが互いに特定の状態に達するのを待機し、スレッドのグループが同時に実行されるようにするために使用されます。

  • CyclicBarrier は、マルチスレッドの計算結果をマージするバリアアクションも提供できます。

  • #CyclicBarrier は、ReentrantLock の「排他ロック」と Conditon を介してスレッドのグループのブロックとウェイクアップを実装します。一方、CountDownLatch は、AQS

  • の「共有ロック」を介して実装されます。

以上がJavaでCyclicBarrierサイクルバリアを適用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。