1: 4개의 새로운 거부 전략이 추가되었습니다. MyAbortPolicy, MyDiscardPolicy, MyDiscardOldestPolicy, MyCallerRunsPolicy
2: 스레드 풀 MyThreadPoolExecutor의 구성 방법을 최적화하고 매개변수 확인을 추가하며 무작위 매개변수 전송을 방지합니다.
3: 이것이 가장 중요한 최적화입니다.
실 풀의 실 예열 기능을 제거합니다. 스레드 예열은 많은 메모리를 소비하기 때문에 스레드 풀을 사용하지 않을 때 항상 실행됩니다.
대신 작업을 추가하기 위해 실행 메소드를 호출할 때 현재 워커 스레드 컬렉션의 크기를 확인하고 이를 corePoolSize 값과 비교한 후 new MyWorker를 통해 스레드를 생성하고 스레드 풀에 추가합니다( ) 스레드 풀을 생성할 때 스레드 풀을 사용하지 않으면 현재 메모리에 아무런 영향을 주지 않는다는 장점이 있습니다. 스레드가 생성되어 재사용을 위해 스레드 풀에 저장됩니다.
public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("线程池参数不合法"); } }
전략 인터페이스: MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
전략 내부 구현 클래스
/** * 实现自定义拒绝策略 */ //抛异常策略(默认) public static class MyAbortPolicy implements MyRejectedExecutionHandle{ public MyAbortPolicy(){ } @Override public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) { throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝"); } } //默默丢弃策略 public static class MyDiscardPolicy implements MyRejectedExecutionHandle{ public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { } } //丢弃掉最老的任务策略 public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{ public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭 threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了 threadPoolExecutor.execute(runnable); //把新任务加入到队列中 } } } //由调用者调用策略 public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{ public MyCallerRunsPolicy(){ } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭 runnable.run(); } } }
캡슐화 거부 메서드
protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); }
@Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程 MyWorker worker = new MyWorker(); //通过构造方法添加线程 } return true; } }
갈 때만 스레드 풀 스레드 개체는 작업이 해제될 때만 생성됩니다.
package com.springframework.concurrent; import java.util.concurrent.BlockingQueue; /** * 自定义线程池业务接口 * @author 游政杰 */ public interface MyExecutorService { boolean execute(Runnable runnable); void shutdown(); void shutdownNow(); boolean isShutdown(); BlockingQueue<Runnable> getWaitingQueue(); }
package com.springframework.concurrent; /** * 自定义拒绝异常 */ public class MyRejectedExecutionException extends RuntimeException { public MyRejectedExecutionException() { } public MyRejectedExecutionException(String message) { super(message); } public MyRejectedExecutionException(String message, Throwable cause) { super(message, cause); } public MyRejectedExecutionException(Throwable cause) { super(cause); } }
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
package com.springframework.concurrent; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 纯手撸线程池框架 * @author 游政杰 */ public class MyThreadPoolExecutor implements MyExecutorService{ private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数 private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号 private static volatile int corePoolSize; //核心线程数 private final HashSetworkers; //工作线程 private final BlockingQueue waitingQueue; //等待队列 private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称 private volatile boolean isRunning=true; //是否运行 private volatile boolean STOPNOW=false; //是否立刻停止 private volatile ThreadFactory threadFactory; //线程工厂 private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略 private volatile MyRejectedExecutionHandle handle; //拒绝紫略 public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("线程池参数不合法"); } } /** * 实现自定义拒绝策略 */ //抛异常策略(默认) public static class MyAbortPolicy implements MyRejectedExecutionHandle{ public MyAbortPolicy(){ } @Override public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) { throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝"); } } //默默丢弃策略 public static class MyDiscardPolicy implements MyRejectedExecutionHandle{ public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { } } //丢弃掉最老的任务策略 public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{ public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭 threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了 threadPoolExecutor.execute(runnable); //把新任务加入到队列中 } } } //由调用者调用策略 public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{ public MyCallerRunsPolicy(){ } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭 runnable.run(); } } } //call拒绝方法 protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); } /** * MyWorker就是我们每一个线程对象 */ private final class MyWorker implements Runnable{ final Thread thread; //为每个MyWorker MyWorker(){ Thread td = threadFactory.newThread(this); td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement()); this.thread=td; this.thread.start(); workers.add(this); } //执行任务 @Override public void run() { //循环接收任务 while (true) { //循环退出条件: //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。 //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。 if((!isRunning&&waitingQueue.size()==0)||STOPNOW) { break; }else { //不断取任务,当任务!=null时则调用run方法处理任务 Runnable runnable = waitingQueue.poll(); if(runnable!=null){ runnable.run(); System.out.println("task==>"+taskcount.incrementAndGet()); } } } } } //往线程池中放任务 @Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程 MyWorker worker = new MyWorker(); //通过构造方法添加线程 } return true; } } //优雅的关闭 @Override public void shutdown() { this.isRunning=false; } //暴力关闭 @Override public void shutdownNow() { this.STOPNOW=true; } //判断线程池是否关闭 @Override public boolean isShutdown() { return !this.isRunning||STOPNOW; } //获取等待队列 @Override public BlockingQueue getWaitingQueue() { return this.waitingQueue; } }
package com.springframework.test; import com.springframework.concurrent.MyThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args) { // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy()); // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy()); // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy()); MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy()); for(int i=0;i<11;i++){ int finalI = i; myThreadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+">>>>"+ finalI); }); } myThreadPoolExecutor.shutdown(); // myThreadPoolExecutor.shutdownNow(); } }
잘 업그레이드된 스레드 이 시점에 최적화되어 있으며 앞으로도 계속해서 최적화되는 완벽한 버전이 될 것입니다.
위 내용은 Java 스레드 풀을 최적화하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!