>  기사  >  Java  >  Java 다중 스레드 프로그래밍을 위한 동기화 장치

Java 다중 스레드 프로그래밍을 위한 동기화 장치

黄舟
黄舟원래의
2017-02-20 10:23:081074검색

Synchronizer

각 특정 동기화 문제에 대한 솔루션을 제공합니다

Semaphore

Semaphore [Semaphore], 공유에 대한 액세스 제어; 자원.

테스트 클래스:

package Concurrent;

import Concurrent.thread.SemaphoreThread;

import java.util.concurrent.Semaphore;

/**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
    public class SemaphoreTest {
       public static void main(String[] args) {
           //在Thread里声明并不是同一个对象
           Semaphore semaphore = new Semaphore(3);
           SemaphoreThread testA = new SemaphoreThread("A", semaphore);
           SemaphoreThread testB = new SemaphoreThread("B", semaphore);
           SemaphoreThread testC = new SemaphoreThread("C", semaphore);
           SemaphoreThread testD = new SemaphoreThread("D", semaphore);
           SemaphoreThread testE = new SemaphoreThread("E", semaphore);
           SemaphoreThread testF = new SemaphoreThread("F", semaphore);
           SemaphoreThread testG = new SemaphoreThread("G", semaphore);
           testA.start();
           testB.start();
           testC.start();
           testD.start();
           testE.start();
           testF.start();
           testG.start();
       }
   }

스레드 작성:

package Concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging. log4j.Logger;

import java.util.concurrent.Semaphore;

 /**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
   public class SemaphoreThread extends Thread {
       private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
       //创建有3个信号量的信号量计数器
       public Semaphore semaphore;
       public SemaphoreThread(String name, Semaphore semaphore) {
           setName(name);
           this.semaphore = semaphore;
       }
       @Override
       public void run() {
           try {
               logger.debug(getName() + " 取号等待... " + System.currentTimeMillis());
               //取出一个信号
               semaphore.acquire();
               logger.debug(getName() + " 提供服务... " + System.currentTimeMillis());
               sleep(1000);
               logger.debug(getName() + " 完成服务... " + System.currentTimeMillis());
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           logger.debug(getName() + " 释放... " + System.currentTimeMillis());
           //释放一个信号
           semaphore.release();
       }
   }

실행 결과 [다음 모든 출력 결과에서 []는 스레드 이름입니다. 후자는 출력입니다.]:

 [C] - C 取号等待... 1470642024037
    [F] - F 取号等待... 1470642024036
    [E] - E 取号等待... 1470642024036
    [B] - B 取号等待... 1470642024037
    [D] - D 取号等待... 1470642024037
    [A] - A 取号等待... 1470642023965
    [D] - D 提供服务... 1470642024039
    [C] - C 提供服务... 1470642024039
    [G] - G 取号等待... 1470642024036
    [F] - F 提供服务... 1470642024040
    [D] - D 完成服务... 1470642025039
    [C] - C 完成服务... 1470642025039
    [D] - D 释放... 1470642025040
    [F] - F 完成服务... 1470642025040
    [C] - C 释放... 1470642025041
    [B] - B 提供服务... 1470642025042
    [A] - A 提供服务... 1470642025042
    [F] - F 释放... 1470642025043
    [E] - E 提供服务... 1470642025043
    [A] - A 完成服务... 1470642026043
    [B] - B 完成服务... 1470642026043
    [B] - B 释放... 1470642026043
    [A] - A 释放... 1470642026043
    [G] - G 提供服务... 1470642026044
    [E] - E 完成服务... 1470642026045
    [E] - E 释放... 1470642026045
    [G] - G 完成服务... 1470642027045
    [G] - G 释放... 1470642027046

세 개의 세마포어가 수신된 후 후속 스레드가 신호가 수신된 위치에서 차단되고 해제되지 않는 것을 볼 수 있습니다. 세마포어가 해제될 때까지 계속 실행합니다.

CountDownLatch
CountDownLatch [Countdown Lock], 스레드에서 countDownLatch.await()를 호출하여 지정된 횟수에 도달하면(countDownLatch.countDown()을 통해) 프로세스를 차단 상태로 만듭니다. , 각 스레드 콘텐츠의 나머지 단계를 계속 실행합니다.

테스트 클래스:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class package concurrent;
import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchTest {
    private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);
    public static void main(String[] args) throws InterruptedException {
        //设定当达成三个计数时触发
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new CountDownLatchThread("A", countDownLatch).start();
        new CountDownLatchThread("B", countDownLatch).start();
        new CountDownLatchThread("C", countDownLatch).start();
        new CountDownLatchThread("D", countDownLatch).start();
        new CountDownLatchThread("E", countDownLatch).start();
        for (int i = 3; i > 0; i--) {
            Thread.sleep(1000);
            logger.debug(i);
            countDownLatch.countDown();
        }
    }
}

스레드 클래스:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchThread extends Thread {
    private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
    //计数器
    private CountDownLatch countDownLatch;
    public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
        setName(name);
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            countDownLatch.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

실행 결과:

[E] - 执行操作...
 [B] - 执行操作...
 [A] - 执行操作...
 [C] - 执行操作...
 [D] - 执行操作...
 [main] DEBUG concurrent.CountDownLatchTest - 3
 [B] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [A] - 等待计数器达到标准...
 [main] DEBUG concurrent.CountDownLatchTest - 2
 [main] DEBUG concurrent.CountDownLatchTest - 1
 [E] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...

CyclicBarrier
CyclicBarrier [Cyclic Cycle, Cyclic Barrier Barrier, Barrier]는 차단된 스레드 수가 지정된 수에 도달할 때까지 주기적으로 대기하여 카운트에 참여하는 스레드가 계속해서 실행되고 특정 스레드를 실행할 수 있도록 합니다( 다른 생성자를 사용하면 도착 후 실행을 설정하지 않을 수 있습니다. 다른 스레드는 지정된 숫자에 다시 도달하기를 기다리면서 여전히 차단됩니다.

테스트 클래스:

package concurrent;
import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CyclicBarrier;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierTest {
    private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);
    public static void main(String[] args) {
          //可以使用CyclicBarrier(int parties)不设定到达后执行的内容
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            logger.debug("---计数到达后执行的内容----");
        });
        new CyclicBarrierThread("A", cyclicBarrier).start();
        new CyclicBarrierThread("B", cyclicBarrier).start();
        new CyclicBarrierThread("C", cyclicBarrier).start();
        new CyclicBarrierThread("D", cyclicBarrier).start();
        new CyclicBarrierThread("E", cyclicBarrier).start();
        new CyclicBarrierThread("A2", cyclicBarrier).start();
        new CyclicBarrierThread("B2", cyclicBarrier).start();
        new CyclicBarrierThread("C2", cyclicBarrier).start();
        new CyclicBarrierThread("D2", cyclicBarrier).start();
        new CyclicBarrierThread("E2", cyclicBarrier).start();
        //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程,
        // 那么当达到5个数量时,只会执行达到时的五个线程的内容,
        // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束
        // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束
    }
}

스레드 클래스:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierThread extends Thread {
    private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);
    private CyclicBarrier cyclicBarrier;
    public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
        super(name);
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            int time = new Random().nextInt(10) * 1000;
            logger.debug("休眠" + time/1000 + "秒");
            sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            cyclicBarrier.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

실행 결과:

[A] - 执行操作...
 [A] - 休眠0秒
 [E2] - 执行操作...
 [E2] - 休眠5秒
 [D2] - 执行操作...
 [D2] - 休眠4秒
 [C2] - 执行操作...
 [C2] - 休眠4秒
 [B2] - 执行操作...
 [B2] - 休眠6秒
 [A2] - 执行操作...
 [A2] - 休眠8秒
 [E] - 执行操作...
 [E] - 休眠5秒
 [D] - 执行操作...
 [D] - 休眠0秒
 [C] - 执行操作...
 [C] - 休眠3秒
 [B] - 执行操作...
 [B] - 休眠7秒
 [A] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D2] - 等待计数器达到标准...
 [C2] - 等待计数器达到标准...
 [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [C2] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [D2] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [E2] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [B2] - 等待计数器达到标准...
 [B] - 等待计数器达到标准...
 [A2] - 等待计数器达到标准...
 [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [E] - 计数达成,继续执行...
 [B2] - 计数达成,继续执行...
 [E2] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [A2] - 计数达成,继续执行...

과거 불규칙 장거리 버스 정류장의 패턴을 짐작할 수 있습니다.

비정기 장거리 버스 정류장은 좌석이 가득 찰 때까지 기다렸다가 목적지에 도착하여 출발합니다. 계속해서 기다렸다가 순환합니다. 모두가 Thread 이고, 버스 탑승 후 cycloBarrier.await();가 발동되며, 좌석이 꽉 찼을 때, 차량 출발 이후 균일하게 실행되는 내용입니다. 출발 후 차량에 탑승한 사람들과 채팅을 할 수 있습니다. [버스에 탑승한 후에는 이동할 수 없는 것으로 일시적으로 이해됩니다. O(∩_∩)O~].

CountDownLatch와 CyclicBarrier의 차이점:

CountDownLatch는 실행을 계속하기 위해 카운트에 도달하기를 기다리는 하나 이상의 스레드입니다. 계산 중.

CyclicBarrier는 N개의 스레드가 계속 실행되기 전에 서로가 0점까지 실행될 때까지 기다리는 곳입니다. wait() 호출도 계산에 참여하며, CyclicBarrier는 조건이 충족된 후 작업 실행을 지원합니다. 이 과정은 성적인 루프입니다.

Exchanger

스레드 간 데이터 교환에 사용되는 Exchanger

테스트 클래스:

package concurrent;
import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;
import java.util.HashMap;
import java.util.concurrent.Exchanger;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerTest {
    public static void main(String[] args) {
        Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>();
        new ExchangerThread("A", exchanger).start();
        new ExchangerThread("B", exchanger).start();
    }
}

엔티티 클래스:

package concurrent.pojo;
import com.alibaba.fastjson.JSON;
import java.util.Date;
import java.util.List;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerPojo {
    private int intVal;
    private String strVal;
    private List<String> strList;
    private Date date;
    public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) {
        this.intVal = intVal;
        this.strVal = strVal;
        this.strList = strList;
        this.date = date;
    }
    public int getIntVal() {
        return intVal;
    }
    public void setIntVal(int intVal) {
        this.intVal = intVal;
    }
    public String getStrVal() {
        return strVal;
    }
    public void setStrVal(String strVal) {
        this.strVal = strVal;
    }
    public List<String> getStrList() {
        return strList;
    }
    public void setStrList(List<String> strList) {
        this.strList = strList;
    }
    public Date getDate() {
        return date;
    }
    public void setDate(Date date) {
        this.date = date;
    }
    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

스레드 클래스:

package concurrent.thread;
import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.Exchanger;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerThread extends Thread {
    private Exchanger<HashMap<String, ExchangerPojo>> exchanger;
    private static final Logger logger = LogManager.getLogger(ExchangerThread.class);
    public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) {
        super(name);
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        HashMap<String, ExchangerPojo> map = new HashMap<>();
        logger.debug(getName() + "提供者提供数据...");
        Random random = new Random();
        for (int i = 0; i < 3; i++) {
            int index = random.nextInt(10);
            List<String> list = new ArrayList<>();
            for (int j = 0; j < index; j++) {
                list.add("list ---> " + j);
            }
            ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date());
            map.put("第" + i + "个数据", pojo);
        }
        try {
            int time = random.nextInt(10);
            logger.debug(getName() + "等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
              //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了
            HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map);
            time = random.nextInt(10);
            logger.debug(getName() + "接受到数据等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
            getMap.forEach((x, y) -> {
                logger.debug(x + " -----> " + y.toString());
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

실행 결과:

[B] - B提供者提供数据...
 [A] - A提供者提供数据...
 [A] - A等待2秒....
 [B] - B等待0秒....
 [A] - A---->2
 [A] - A---->1
 [B] - B接受到数据等待1秒....
 [A] - A接受到数据等待4秒....
 [B] - B---->1
 [A] - A---->4
 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"}
 [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"}
 [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"}
 [A] - A---->3
 [A] - A---->2
 [A] - A---->1
 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"}
 [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],
 "strVal":"B提供的数据"}
 [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list --->

5 "],"strVal":"B에서 제공한 데이터"}

Phaser

Phaser는 개인적으로 CountDownLatch와 CyclicBarrier의 기능을 결합하여 단계별 기능을 제공한다고 느낍니다.

단계적 CyclicBarrier 기능 구현

테스트 코드:

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {
    private static final Logger logger = LogManager.getLogger(PhaserTest.class);
    public static void main(String[] args) {
        Phaser phaser = new Phaser() {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
             例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };
        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }
    }
}

스레드 코드: >

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.Phaser;
/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:16:55。
 */
public class PhaserThread extends Thread {
    private Phaser phaser;
    private static final Logger logger = LogManager.getLogger(PhaserThread.class);
    public PhaserThread(String name, Phaser phaser) {
        super(name);
        this.phaser = phaser;
        //把当前线程注册到Phaser
        this.phaser.register();
        logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程");
    }
    @Override
    public void run() {
        logger.debug("进入...");
        phaser.arrive();
        for (int i = 6; i > 0; i--) {
            int time = new Random().nextInt(5);
            try {
                logger.debug("睡眠" + time + "秒");
                sleep(time * 1000);
                if (i == 1) {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug("最后一次触发,并注销自身");
                    phaser.arriveAndDeregister();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                } else {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug(i + "--->触发并阻塞...");
                    phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties());
    }
}

실행 결과:


[main] - name为第3个的线程注册了1个线程
 [main] - name为第2个的线程注册了2个线程
 [main] - name为第1个的线程注册了3个线程
 [第3个] - 进入...
 [第2个] - 进入...
 [第3个] - 睡眠2秒
 [第2个] - 睡眠1秒
 [第1个] - 进入...
 [第1个] - 阶段--->0
 [第1个] - 注册的线程数量--->3
 [第1个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 6--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第3个] - 6--->触发并阻塞...
 [第1个] - 未完成的线程数量:1
 [第1个] - 6--->触发并阻塞...
 [第1个] - 阶段--->1
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠1秒
 [第3个] - 睡眠0秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 5--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 5--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 5--->触发并阻塞...
 [第2个] - 阶段--->2
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠0秒
 [第3个] - 睡眠2秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第2个] - 4--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第1个] - 未完成的线程数量:2
 [第3个] - 4--->触发并阻塞...
 [第1个] - 4--->触发并阻塞...
 [第1个] - 阶段--->3
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第3个] - 睡眠1秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 3--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 3--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 3--->触发并阻塞...
 [第2个] - 阶段--->4
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠2秒
 [第1个] - 睡眠2秒
 [第3个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 2--->触发并阻塞...
 [第1个] - 2--->触发并阻塞...
 [第3个] - 未完成的线程数量:1
 [第3个] - 2--->触发并阻塞...
 [第3个] - 阶段--->5
 [第3个] - 注册的线程数量--->3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第3个] - 睡眠2秒
 [第1个] - 睡眠3秒
 [第2个] - 睡眠0秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 最后一次触发,并注销自身
 [第2个] - 未完成的线程数量:2
 [第2个] - 注销完成之后注册的线程数量--->2
 [第3个] - 未完成的线程数量:2
 [第3个] - 最后一次触发,并注销自身
 [第3个] - 未完成的线程数量:1
 [第3个] - 注销完成之后注册的线程数量--->1
 [第1个] - 未完成的线程数量:1
 [第1个] - 最后一次触发,并注销自身
 [第1个] - 阶段--->6
 [第1个] - 注册的线程数量--->0
 [第1个] - 未完成的线程数量:0
 [第1个] - 注销完成之后注册的线程数量--->0

위 코드에서 모든 스레드가 ArrivalAndAwaitAdvance()에 도달하면 카운팅이 시작되고 카운트 수가 등록과 같아질 때까지 스레드가 차단됩니다. 스레드 수 [즉, 모든 스레드가 합의된 위치에서 실행되면 해제되므로 모든 스레드가 계속해서 onAction 이벤트를 실행하고 트리거할 수 있습니다.]

단계별 CountDownLatch 기능을 구현하려면

위 테스트 클래스를 다음과 같이 변경하면 됩니다.

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
import static jodd.util.ThreadUtil.sleep;
/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {
    private static final Logger logger = LogManager.getLogger(PhaserTest.class);
    public static void main(String[] args) {
        //这里其实相当于已经注册了3个线程,但是并没有实际的线程
        int coutNum=3;
        Phaser phaser = new Phaser(coutNum) {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
             例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser
            }
        };
        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }
        //当phaser未终止时循环注册这块儿可以使用实际的业务处理
        while (!phaser.isTerminated()) {
            sleep(1000);
            logger.debug("触发一次");
            phaser.arrive(); //相当于countDownLatch.countDown();
        }
    }
}

위 내용은 Java, 멀티스레딩, 싱크로나이저 등의 내용입니다. 관련 내용은 PHP 중국어 홈페이지(www.php.cn)를 참고하세요!

-->

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.