Home >Java >javaTutorial >Synchronizer for Java multi-thread programming
Provides solutions for each specific synchronization problem
Semaphore [Semaphore; semaphore], through Counters control access to shared resources.
Test class:
package concurrent;
import concurrent.thread.SemaphoreThread;
import java.util.concurrent.Semaphore;
/** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreTest { public static void main(String[] args) { //在Thread里声明并不是同一个对象 Semaphore semaphore = new Semaphore(3); SemaphoreThread testA = new SemaphoreThread("A", semaphore); SemaphoreThread testB = new SemaphoreThread("B", semaphore); SemaphoreThread testC = new SemaphoreThread("C", semaphore); SemaphoreThread testD = new SemaphoreThread("D", semaphore); SemaphoreThread testE = new SemaphoreThread("E", semaphore); SemaphoreThread testF = new SemaphoreThread("F", semaphore); SemaphoreThread testG = new SemaphoreThread("G", semaphore); testA.start(); testB.start(); testC.start(); testD.start(); testE.start(); testF.start(); testG.start(); } }
Thread writing method:
package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging. log4j.Logger;
import java.util.concurrent.Semaphore;
/** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreThread extends Thread { private static final Logger logger = LogManager.getLogger(SemaphoreThread.class); //创建有3个信号量的信号量计数器 public Semaphore semaphore; public SemaphoreThread(String name, Semaphore semaphore) { setName(name); this.semaphore = semaphore; } @Override public void run() { try { logger.debug(getName() + " 取号等待... " + System.currentTimeMillis()); //取出一个信号 semaphore.acquire(); logger.debug(getName() + " 提供服务... " + System.currentTimeMillis()); sleep(1000); logger.debug(getName() + " 完成服务... " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug(getName() + " 释放... " + System.currentTimeMillis()); //释放一个信号 semaphore.release(); } }
Execution results [In all the following output results, [] is the thread name - the latter is the output Content]:
[C] - C 取号等待... 1470642024037 [F] - F 取号等待... 1470642024036 [E] - E 取号等待... 1470642024036 [B] - B 取号等待... 1470642024037 [D] - D 取号等待... 1470642024037 [A] - A 取号等待... 1470642023965 [D] - D 提供服务... 1470642024039 [C] - C 提供服务... 1470642024039 [G] - G 取号等待... 1470642024036 [F] - F 提供服务... 1470642024040 [D] - D 完成服务... 1470642025039 [C] - C 完成服务... 1470642025039 [D] - D 释放... 1470642025040 [F] - F 完成服务... 1470642025040 [C] - C 释放... 1470642025041 [B] - B 提供服务... 1470642025042 [A] - A 提供服务... 1470642025042 [F] - F 释放... 1470642025043 [E] - E 提供服务... 1470642025043 [A] - A 完成服务... 1470642026043 [B] - B 完成服务... 1470642026043 [B] - B 释放... 1470642026043 [A] - A 释放... 1470642026043 [G] - G 提供服务... 1470642026044 [E] - E 完成服务... 1470642026045 [E] - E 释放... 1470642026045 [G] - G 完成服务... 1470642027045 [G] - G 释放... 1470642027046
You can see that after the three semaphores are received, subsequent threads will be blocked at the position where the signals are received, and will not be released until the semaphore is released. Continue execution.
CountDownLatch
CountDownLatch [Countdown Lock], call countDownLatch.await() in the thread to make the process enter the blocking state. When the specified number of times is reached (through countDownLatch.countDown()), continue to execute the remaining steps in each thread Content.
Test class:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class package concurrent; import concurrent.thread.CountDownLatchThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CountDownLatchTest { private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class); public static void main(String[] args) throws InterruptedException { //设定当达成三个计数时触发 CountDownLatch countDownLatch = new CountDownLatch(3); new CountDownLatchThread("A", countDownLatch).start(); new CountDownLatchThread("B", countDownLatch).start(); new CountDownLatchThread("C", countDownLatch).start(); new CountDownLatchThread("D", countDownLatch).start(); new CountDownLatchThread("E", countDownLatch).start(); for (int i = 3; i > 0; i--) { Thread.sleep(1000); logger.debug(i); countDownLatch.countDown(); } } }
Thread class:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CountDownLatchThread extends Thread { private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class); //计数器 private CountDownLatch countDownLatch; public CountDownLatchThread(String name, CountDownLatch countDownLatch) { setName(name); this.countDownLatch = countDownLatch; } @Override public void run() { logger.debug("执行操作..."); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待计数器达到标准..."); try { //让线程进入阻塞状态,等待计数达成后释放 countDownLatch.await(); logger.debug("计数达成,继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
Execution result:
[E] - 执行操作... [B] - 执行操作... [A] - 执行操作... [C] - 执行操作... [D] - 执行操作... [main] DEBUG concurrent.CountDownLatchTest - 3 [B] - 等待计数器达到标准... [E] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D] - 等待计数器达到标准... [A] - 等待计数器达到标准... [main] DEBUG concurrent.CountDownLatchTest - 2 [main] DEBUG concurrent.CountDownLatchTest - 1 [E] - 计数达成,继续执行... [C] - 计数达成,继续执行... [B] - 计数达成,继续执行... [D] - 计数达成,继续执行... [A] - 计数达成,继续执行...
CyclicBarrier
CyclicBarrier [Cyclic cycle, cyclic Barrier barrier, barrier] cyclically waits for the number of blocked threads to reach the specified number, so that the threads participating in the count can continue to execute and can execute specific threads (different constructors can be used Do not set the execution after arrival), other threads are still blocked waiting to reach the specified number again.
Test class:
package concurrent; import concurrent.thread.CyclicBarrierThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CyclicBarrierTest { private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class); public static void main(String[] args) { //可以使用CyclicBarrier(int parties)不设定到达后执行的内容 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { logger.debug("---计数到达后执行的内容----"); }); new CyclicBarrierThread("A", cyclicBarrier).start(); new CyclicBarrierThread("B", cyclicBarrier).start(); new CyclicBarrierThread("C", cyclicBarrier).start(); new CyclicBarrierThread("D", cyclicBarrier).start(); new CyclicBarrierThread("E", cyclicBarrier).start(); new CyclicBarrierThread("A2", cyclicBarrier).start(); new CyclicBarrierThread("B2", cyclicBarrier).start(); new CyclicBarrierThread("C2", cyclicBarrier).start(); new CyclicBarrierThread("D2", cyclicBarrier).start(); new CyclicBarrierThread("E2", cyclicBarrier).start(); //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程, // 那么当达到5个数量时,只会执行达到时的五个线程的内容, // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束 // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束 } }
Thread class:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CyclicBarrierThread extends Thread { private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class); private CyclicBarrier cyclicBarrier; public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) { super(name); this.cyclicBarrier = cyclicBarrier; } @Override public void run() { logger.debug("执行操作..."); try { int time = new Random().nextInt(10) * 1000; logger.debug("休眠" + time/1000 + "秒"); sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待计数器达到标准..."); try { //让线程进入阻塞状态,等待计数达成后释放 cyclicBarrier.await(); logger.debug("计数达成,继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
Execution result:
[A] - 执行操作... [A] - 休眠0秒 [E2] - 执行操作... [E2] - 休眠5秒 [D2] - 执行操作... [D2] - 休眠4秒 [C2] - 执行操作... [C2] - 休眠4秒 [B2] - 执行操作... [B2] - 休眠6秒 [A2] - 执行操作... [A2] - 休眠8秒 [E] - 执行操作... [E] - 休眠5秒 [D] - 执行操作... [D] - 休眠0秒 [C] - 执行操作... [C] - 休眠3秒 [B] - 执行操作... [B] - 休眠7秒 [A] - 等待计数器达到标准... [D] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D2] - 等待计数器达到标准... [C2] - 等待计数器达到标准... [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [C2] - 计数达成,继续执行... [A] - 计数达成,继续执行... [C] - 计数达成,继续执行... [D2] - 计数达成,继续执行... [D] - 计数达成,继续执行... [E2] - 等待计数器达到标准... [E] - 等待计数器达到标准... [B2] - 等待计数器达到标准... [B] - 等待计数器达到标准... [A2] - 等待计数器达到标准... [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [E] - 计数达成,继续执行... [B2] - 计数达成,继续执行... [E2] - 计数达成,继续执行... [B] - 计数达成,继续执行... [A2] - 计数达成,继续执行...
can be imagined as the pattern of irregular long-distance bus stations in the past:
Irregular long-distance bus stations will wait until the seats are full before departing. After arriving at the destination, they will continue to wait and then cycle. Everyone is a Thread, and cyclicBarrier.await(); is triggered after getting on the bus. When the seats are full, it is when the specified number of achievements is reached. The vehicle departure is the content that is uniformly executed after the vehicle is reached. After the vehicle starts, people on the vehicle can chat. [We temporarily understand that people cannot move after getting on the bus O(∩_∩)O~].
CountDownLatch is one or more threads waiting for the count to be reached to continue execution. The await() call does not participate in the counting.
CyclicBarrier is where N threads wait for each other to execute to the zero point before continuing to execute. The await() call also participates in the counting, and CyclicBarrier supports executing an action after the condition is met, and this process is a cycle. sexual.
Exchanger8742468051c85b06f0a0af9e3e506b5c is used for data exchange between threads
Test class:
package concurrent; import concurrent.pojo.ExchangerPojo; import concurrent.thread.ExchangerThread; import java.util.HashMap; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerTest { public static void main(String[] args) { Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>(); new ExchangerThread("A", exchanger).start(); new ExchangerThread("B", exchanger).start(); } }
Entity class:
package concurrent.pojo; import com.alibaba.fastjson.JSON; import java.util.Date; import java.util.List; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerPojo { private int intVal; private String strVal; private List<String> strList; private Date date; public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) { this.intVal = intVal; this.strVal = strVal; this.strList = strList; this.date = date; } public int getIntVal() { return intVal; } public void setIntVal(int intVal) { this.intVal = intVal; } public String getStrVal() { return strVal; } public void setStrVal(String strVal) { this.strVal = strVal; } public List<String> getStrList() { return strList; } public void setStrList(List<String> strList) { this.strList = strList; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return JSON.toJSONString(this); } }
Thread class:
package concurrent.thread; import concurrent.pojo.ExchangerPojo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerThread extends Thread { private Exchanger<HashMap<String, ExchangerPojo>> exchanger; private static final Logger logger = LogManager.getLogger(ExchangerThread.class); public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) { super(name); this.exchanger = exchanger; } @Override public void run() { HashMap<String, ExchangerPojo> map = new HashMap<>(); logger.debug(getName() + "提供者提供数据..."); Random random = new Random(); for (int i = 0; i < 3; i++) { int index = random.nextInt(10); List<String> list = new ArrayList<>(); for (int j = 0; j < index; j++) { list.add("list ---> " + j); } ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date()); map.put("第" + i + "个数据", pojo); } try { int time = random.nextInt(10); logger.debug(getName() + "等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了 HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map); time = random.nextInt(10); logger.debug(getName() + "接受到数据等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } getMap.forEach((x, y) -> { logger.debug(x + " -----> " + y.toString()); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
Execution result:
[B] - B提供者提供数据... [A] - A提供者提供数据... [A] - A等待2秒.... [B] - B等待0秒.... [A] - A---->2 [A] - A---->1 [B] - B接受到数据等待1秒.... [A] - A接受到数据等待4秒.... [B] - B---->1 [A] - A---->4 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"} [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"} [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"} [A] - A---->3 [A] - A---->2 [A] - A---->1 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"} [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"], "strVal":"B提供的数据"} [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list --->
5"],"strVal":"Data provided by B"}
Phaser personally feels that it combines the functions of CountDownLatch and CyclicBarrier, and provides phased capabilities .
Test code:
##
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { Phaser phaser = new Phaser() { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。 例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("阶段--->" + phase); logger.debug("注册的线程数量--->" + registeredParties); return super.onAdvance(phase, registeredParties); } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "个", phaser).start(); } } }Thread code:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.Phaser; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:16:55。 */ public class PhaserThread extends Thread { private Phaser phaser; private static final Logger logger = LogManager.getLogger(PhaserThread.class); public PhaserThread(String name, Phaser phaser) { super(name); this.phaser = phaser; //把当前线程注册到Phaser this.phaser.register(); logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程"); } @Override public void run() { logger.debug("进入..."); phaser.arrive(); for (int i = 6; i > 0; i--) { int time = new Random().nextInt(5); try { logger.debug("睡眠" + time + "秒"); sleep(time * 1000); if (i == 1) { logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug("最后一次触发,并注销自身"); phaser.arriveAndDeregister(); logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); } else { logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug(i + "--->触发并阻塞..."); phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await(); logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); } } catch (InterruptedException e) { e.printStackTrace(); } } logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties()); } }Execution result:
[main] - name为第3个的线程注册了1个线程 [main] - name为第2个的线程注册了2个线程 [main] - name为第1个的线程注册了3个线程 [第3个] - 进入... [第2个] - 进入... [第3个] - 睡眠2秒 [第2个] - 睡眠1秒 [第1个] - 进入... [第1个] - 阶段--->0 [第1个] - 注册的线程数量--->3 [第1个] - 睡眠4秒 [第2个] - 未完成的线程数量:3 [第2个] - 6--->触发并阻塞... [第3个] - 未完成的线程数量:2 [第3个] - 6--->触发并阻塞... [第1个] - 未完成的线程数量:1 [第1个] - 6--->触发并阻塞... [第1个] - 阶段--->1 [第1个] - 注册的线程数量--->3 [第1个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠1秒 [第3个] - 睡眠0秒 [第2个] - 睡眠4秒 [第3个] - 未完成的线程数量:3 [第3个] - 5--->触发并阻塞... [第1个] - 未完成的线程数量:2 [第1个] - 5--->触发并阻塞... [第2个] - 未完成的线程数量:1 [第2个] - 5--->触发并阻塞... [第2个] - 阶段--->2 [第2个] - 注册的线程数量--->3 [第2个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 睡眠0秒 [第3个] - 睡眠2秒 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠2秒 [第2个] - 4--->触发并阻塞... [第3个] - 未完成的线程数量:2 [第1个] - 未完成的线程数量:2 [第3个] - 4--->触发并阻塞... [第1个] - 4--->触发并阻塞... [第1个] - 阶段--->3 [第1个] - 注册的线程数量--->3 [第1个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠2秒 [第3个] - 睡眠1秒 [第2个] - 睡眠4秒 [第3个] - 未完成的线程数量:3 [第3个] - 3--->触发并阻塞... [第1个] - 未完成的线程数量:2 [第1个] - 3--->触发并阻塞... [第2个] - 未完成的线程数量:1 [第2个] - 3--->触发并阻塞... [第2个] - 阶段--->4 [第2个] - 注册的线程数量--->3 [第2个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 睡眠2秒 [第1个] - 睡眠2秒 [第3个] - 睡眠4秒 [第2个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 2--->触发并阻塞... [第1个] - 2--->触发并阻塞... [第3个] - 未完成的线程数量:1 [第3个] - 2--->触发并阻塞... [第3个] - 阶段--->5 [第3个] - 注册的线程数量--->3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第3个] - 睡眠2秒 [第1个] - 睡眠3秒 [第2个] - 睡眠0秒 [第2个] - 未完成的线程数量:3 [第2个] - 最后一次触发,并注销自身 [第2个] - 未完成的线程数量:2 [第2个] - 注销完成之后注册的线程数量--->2 [第3个] - 未完成的线程数量:2 [第3个] - 最后一次触发,并注销自身 [第3个] - 未完成的线程数量:1 [第3个] - 注销完成之后注册的线程数量--->1 [第1个] - 未完成的线程数量:1 [第1个] - 最后一次触发,并注销自身 [第1个] - 阶段--->6 [第1个] - 注册的线程数量--->0 [第1个] - 未完成的线程数量:0 [第1个] - 注销完成之后注册的线程数量--->0In the above code, when all threads reach arrivalAndAwaitAdvance(), counting will be triggered and the threads will be blocked until the number of counts equals the registration The number of threads [that is, when all threads are executed to the agreed place, they will be released, so that all threads can continue to execute and trigger the onAction event]. We can perform different operations according to different stages in onAction.
To implement the phased CountDownLatch function
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; import static jodd.util.ThreadUtil.sleep; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { //这里其实相当于已经注册了3个线程,但是并没有实际的线程 int coutNum=3; Phaser phaser = new Phaser(coutNum) { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。 例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("阶段--->" + phase); logger.debug("注册的线程数量--->" + registeredParties); return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "个", phaser).start(); } //当phaser未终止时循环注册这块儿可以使用实际的业务处理 while (!phaser.isTerminated()) { sleep(1000); logger.debug("触发一次"); phaser.arrive(); //相当于countDownLatch.countDown(); } } }
##
-->