Home  >  Article  >  Java  >  How to optimize Java thread pool?

How to optimize Java thread pool?

PHPz
PHPzforward
2023-05-09 19:31:05805browse

Optimization of the upgraded version of the thread pool

1: 4 new rejection strategies have been added. They are: MyAbortPolicy, MyDiscardPolicy, MyDiscardOldestPolicy, MyCallerRunsPolicy

2: Optimize the construction method of the thread pool MyThreadPoolExecutor and add parameter verification to prevent random parameter transmission.

3: This is the most important optimization.

  • Remove the thread preheating function of the thread pool. Because thread preheating consumes a lot of memory, it will always be running when we are not using the thread pool.

  • In exchange, when the execute method is called to add a task, the current size of the workers thread collection is compared with the value of corePoolSize, and then new MyWorker() is used to create and add threads to the thread. Pool, the advantage of this is that when we create a thread pool, if it is not used, it will have no impact on the current memory. When it is used, the thread will be created and put into the thread pool for reuse.

Thread pool constructor

public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }

Thread pool rejection strategy

Strategy interface: MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

Strategy internal implementation class

/**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }

Encapsulation rejection method

protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

execute method

@Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }

It can be seen that the thread object will only be created when a task is placed in the thread pool.

Handwritten thread pool source code

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**
 * 自定义线程池业务接口
 * @author 游政杰
 */
public interface MyExecutorService {

    boolean execute(Runnable runnable);

    void shutdown();

    void shutdownNow();

    boolean isShutdown();

    BlockingQueue<Runnable> getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**
 * 自定义拒绝异常
 */
public class MyRejectedExecutionException extends RuntimeException {

    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }

    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }

    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

Core class MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 纯手撸线程池框架
 * @author 游政杰
 */
public class MyThreadPoolExecutor implements MyExecutorService{

    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private static volatile int corePoolSize; //核心线程数
    private final HashSet workers; //工作线程
    private final BlockingQueue waitingQueue; //等待队列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //线程工厂
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略
    private volatile MyRejectedExecutionHandle handle; //拒绝紫略

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }
    /**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }
    //call拒绝方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        //执行任务
        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    //往线程池中放任务
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }
    //优雅的关闭
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }

    //判断线程池是否关闭
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }

    //获取等待队列
    @Override
    public BlockingQueue getWaitingQueue() {
        return this.waitingQueue;
    }
}

Thread pool test class

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {


//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());


      for(int i=0;i<11;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();




  }
}

Well, the upgraded version of the thread pool has been optimized to this point. A perfect version may be released later for continuous optimization.

The above is the detailed content of How to optimize Java thread pool?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete