首頁  >  文章  >  Java  >  Java中CyclicBarrier循環屏障怎麼應用

Java中CyclicBarrier循環屏障怎麼應用

WBOY
WBOY轉載
2023-05-12 14:19:181145瀏覽

一、簡介

CyclicBarrier 字面意思回環柵欄(循環屏障),它可以實現讓一組執行緒等待至某個狀態(屏障點)之後再全部同時執行。叫做回環是因為當所有等待線程都被釋放以後,CyclicBarrier可以被重複使用。 

Java中CyclicBarrier循環屏障怎麼應用

CyclicBarrier 作用是讓一組執行緒互相等待,當達到一個共同點時,所有先前等待的執行緒再繼續執行,且 CyclicBarrier 功能可重複使用。

二、CyclicBarrier的使用

#建構方法:

 // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)

重要方法:

//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环  通过reset()方法可以进行重置

CyclicBarrier 應用場景

  • 利用CyclicBarrier 可以用於多執行緒計算數據,最後合併計算結果的場景。

  • 利用CyclicBarrier的計數器能夠重置,屏障可以重複使用的特性,可以支援類似「人滿發車」的場景

##模擬合併計算場景

利用CyclicBarrier 可以用於多執行緒計算數據,最後合併計算結果的場景。

public class CyclicBarrierTest2 {
    //保存每个学生的平均成绩
    private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
    private ExecutorService threadPool= Executors.newFixedThreadPool(3);
    private CyclicBarrier cb=new CyclicBarrier(3,()->{
        int result=0;
        Set<String> set = map.keySet();
        for(String s:set){
            result+=map.get(s);
        }
        System.out.println("三人平均成绩为:"+(result/3)+"分");
    });
    public void count(){
        for(int i=0;i<3;i++){
            threadPool.execute(new Runnable(){

                @Override
                public void run() {
                    //获取学生平均成绩
                    int score=(int)(Math.random()*40+60);
                    map.put(Thread.currentThread().getName(), score);
                    System.out.println(Thread.currentThread().getName()
                            +"同学的平均成绩为:"+score);
                    try {
                        //执行完运行await(),等待所有学生平均成绩都计算完毕
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }

            });
        }
    }
    public static void main(String[] args) {
        CyclicBarrierTest2 cb=new CyclicBarrierTest2();
        cb.count();
    }
}

模擬「人滿發車」的場景

利用CyclicBarrier的計數器能夠重置,屏障可以重複使用的特性,可以支援類似「人滿發車」的場景

public class CyclicBarrierTest3 {
    public static void main(String[] args) {
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 5, 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
                new ThreadPoolExecutor.AbortPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("裁判:比赛开始~~"));

        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }

    }
    static class Runner extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Runner (CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }

}

輸出結果:

3 號 選手已就位, 準備共用時: 78ms0

1 號 選手已就位, 準備共用時: 395ms1
5 號 選手已就位, 準備共用時: 733ms2
2 號 選手已就位, 準備共用時: 776ms3
4 號 選手已就位, 準備共用時: 807ms4
裁判:比賽開始~~
4 號 選手已就位, 準備共用時: 131ms0
3 號 選手已就位, 準備共用時: 256ms1
2 號 選手已就位, 準備共用時: 291ms2
# 1 號 選手已就位, 準備共用時: 588ms3
5 號 選手已就位, 準備共用時: 763ms4
裁判:比賽開始~~

#三、CyclicBarrier 原始碼分析

CyclicBarrier 流程​​

主要是的流程:

  • 取得鎖定如果count != 0 就進入阻塞;

  • 進入阻塞之前,首先需要進入條件佇列,然後釋放鎖,最後阻塞;

  • 如果count != 0 會進行一個喚醒,將所有的條件佇列中的節點轉換為阻塞佇列;

  • 被喚醒後會進行鎖定的獲取,如果鎖定獲取失敗,會進入lock 的阻塞佇列;

  • 如果鎖定取得成功,進行鎖定的釋放,以及喚醒,同步佇列中的執行緒。

下面是一個簡單的流程圖:

Java中CyclicBarrier循環屏障怎麼應用

下面是一些具體的程式碼呼叫的流程:

Java中CyclicBarrier循環屏障怎麼應用

幾個常見的問題?

  • 1.一組執行緒在觸發屏障之前互相等待,最後一個執行緒到達屏障後喚醒邏輯是如何實現的. 喚醒的過程是透過呼叫 

    java.util. concurrent.locks.Condition#signalAll喚醒條件佇列上的所有節點。

  • 2.刪欄循環使用如何實現的? 實際上一個互斥鎖 ReentrantLock 的條件佇列和阻塞佇列的轉換。

  • 3.條件佇列到同步佇列的轉換實作邏輯? 轉換過程中,首先會先將條件佇列中所有的阻塞執行緒喚醒,然後會去取得lock 如果取得失敗,就進入同步隊列。

CyclicBarrier 與 CountDownLatch的差異

  • CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset() 方法重設。所以CyclicBarrier能處理更複雜的業務場景,例如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次

  • CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的執行緒數量)、isBroken(用來知道阻塞的執行緒是否被中斷)等方法。

  • CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子執行緒。

  • CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們重點不同。 CountDownLatch一般用於一個或多個線程,等待其他執行緒執行完任務後,再執行。 CyclicBarrier一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行。

  • CyclicBarrier 也可以提供一個 barrierAction,合併多執行緒計算結果。

  • CyclicBarrier是透過ReentrantLock的"獨佔鎖定"和Conditon來實現一組執行緒的阻塞喚醒的,而CountDownLatch則是透過AQS的「共享鎖定」實作

#

以上是Java中CyclicBarrier循環屏障怎麼應用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除