1: 4 strategi penolakan baharu telah ditambah. Ia adalah: MyAbortPolicy, MyDiscardPolicy, MyDiscardOldestPolicy, MyCallerRunsPolicy
2: Optimumkan kaedah pembinaan kumpulan benang MyThreadPoolExecutor dan tambah pengesahan parameter untuk mengelakkan penghantaran parameter rawak.
3: Ini adalah pengoptimuman yang paling penting.
Alih keluar fungsi prapanas benang pada kolam benang. Oleh kerana pemanasan awal benang menggunakan banyak memori, ia akan sentiasa berjalan apabila kita tidak menggunakan kumpulan benang.
Sebagai pertukaran, apabila memanggil kaedah laksana untuk menambah tugas, ia menyemak saiz semasa koleksi benang pekerja dan membandingkannya dengan nilai corePoolSize, dan kemudian mencipta dan menambah benang ke thread melalui MyWorker() Pool baharu, kelebihannya ialah apabila kita mencipta kumpulan benang, jika ia tidak digunakan, ia tidak akan memberi kesan kepada memori semasa Apabila ia digunakan, benang akan dicipta dan dimasukkan ke dalam kolam benang untuk digunakan semula.
public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("线程池参数不合法"); } }
Antara muka dasar: MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
kelas pelaksanaan dalaman strategi >Kaedah penolakan enkapsulasi
protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); }melaksanakan kaedah
@Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程 MyWorker worker = new MyWorker(); //通过构造方法添加线程 } return true; } }
package com.springframework.concurrent; import java.util.concurrent.BlockingQueue; /** * 自定义线程池业务接口 * @author 游政杰 */ public interface MyExecutorService { boolean execute(Runnable runnable); void shutdown(); void shutdownNow(); boolean isShutdown(); BlockingQueue<Runnable> getWaitingQueue(); }
package com.springframework.concurrent; /** * 自定义拒绝异常 */ public class MyRejectedExecutionException extends RuntimeException { public MyRejectedExecutionException() { } public MyRejectedExecutionException(String message) { super(message); } public MyRejectedExecutionException(String message, Throwable cause) { super(message, cause); } public MyRejectedExecutionException(Throwable cause) { super(cause); } }
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
package com.springframework.concurrent; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 纯手撸线程池框架 * @author 游政杰 */ public class MyThreadPoolExecutor implements MyExecutorService{ private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数 private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号 private static volatile int corePoolSize; //核心线程数 private final HashSetworkers; //工作线程 private final BlockingQueue waitingQueue; //等待队列 private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称 private volatile boolean isRunning=true; //是否运行 private volatile boolean STOPNOW=false; //是否立刻停止 private volatile ThreadFactory threadFactory; //线程工厂 private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略 private volatile MyRejectedExecutionHandle handle; //拒绝紫略 public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("线程池参数不合法"); } } /** * 实现自定义拒绝策略 */ //抛异常策略(默认) public static class MyAbortPolicy implements MyRejectedExecutionHandle{ public MyAbortPolicy(){ } @Override public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) { throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝"); } } //默默丢弃策略 public static class MyDiscardPolicy implements MyRejectedExecutionHandle{ public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { } } //丢弃掉最老的任务策略 public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{ public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭 threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了 threadPoolExecutor.execute(runnable); //把新任务加入到队列中 } } } //由调用者调用策略 public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{ public MyCallerRunsPolicy(){ } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭 runnable.run(); } } } //call拒绝方法 protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); } /** * MyWorker就是我们每一个线程对象 */ private final class MyWorker implements Runnable{ final Thread thread; //为每个MyWorker MyWorker(){ Thread td = threadFactory.newThread(this); td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement()); this.thread=td; this.thread.start(); workers.add(this); } //执行任务 @Override public void run() { //循环接收任务 while (true) { //循环退出条件: //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。 //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。 if((!isRunning&&waitingQueue.size()==0)||STOPNOW) { break; }else { //不断取任务,当任务!=null时则调用run方法处理任务 Runnable runnable = waitingQueue.poll(); if(runnable!=null){ runnable.run(); System.out.println("task==>"+taskcount.incrementAndGet()); } } } } } //往线程池中放任务 @Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程 MyWorker worker = new MyWorker(); //通过构造方法添加线程 } return true; } } //优雅的关闭 @Override public void shutdown() { this.isRunning=false; } //暴力关闭 @Override public void shutdownNow() { this.STOPNOW=true; } //判断线程池是否关闭 @Override public boolean isShutdown() { return !this.isRunning||STOPNOW; } //获取等待队列 @Override public BlockingQueue getWaitingQueue() { return this.waitingQueue; } }
Atas ialah kandungan terperinci Bagaimana untuk mengoptimumkan kumpulan benang Java?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!