首頁 >Java >java教程 >如何優化Java執行緒池?

如何優化Java執行緒池?

PHPz
PHPz轉載
2023-05-09 19:31:05854瀏覽

升級版執行緒池的最佳化

1:新增了4種拒絕策略。分別為:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2:對執行緒池MyThreadPoolExecutor的建構方法進行最佳化,增加了參數校驗,防止亂傳參數現象。

3:這是最重要的最佳化。

  • 移除執行緒池的執行緒預熱功能。因為線程預熱會極大的耗費內存,當我們不用線程池時也會一直在運行狀態。

  • 換來的是在呼叫execute方法新增任務時透過檢查workers執行緒集合目前的大小與corePoolSize的值去比較,再透過new MyWorker()去建立新增執行緒到執行緒池,這樣好處就是當我們創建線程池如果不使用的話則對當前內存沒有一點影響,當使用了才會創建線程並放入線程池中進行復用。

執行緒池建構器

public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }

執行緒池拒絕策略

策略介面:MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

策略內部實作類別

/**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }

封裝拒絕方法

protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

execute方法

@Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }

可以看出只有當到執行緒池放任務時才會建立執行緒物件。

手寫執行緒池原始碼

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**
 * 自定义线程池业务接口
 * @author 游政杰
 */
public interface MyExecutorService {

    boolean execute(Runnable runnable);

    void shutdown();

    void shutdownNow();

    boolean isShutdown();

    BlockingQueue<Runnable> getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**
 * 自定义拒绝异常
 */
public class MyRejectedExecutionException extends RuntimeException {

    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }

    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }

    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

核心類別MyThreadPoolExecutoror執行緒池測試類別

package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 纯手撸线程池框架
 * @author 游政杰
 */
public class MyThreadPoolExecutor implements MyExecutorService{

    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private static volatile int corePoolSize; //核心线程数
    private final HashSet workers; //工作线程
    private final BlockingQueue waitingQueue; //等待队列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //线程工厂
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略
    private volatile MyRejectedExecutionHandle handle; //拒绝紫略

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }
    /**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }
    //call拒绝方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        //执行任务
        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    //往线程池中放任务
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }
    //优雅的关闭
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }

    //判断线程池是否关闭
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }

    //获取等待队列
    @Override
    public BlockingQueue getWaitingQueue() {
        return this.waitingQueue;
    }
}

好了升級版執行緒池就優化到這了,後面可能還會出完善版,不斷進行最佳化。

以上是如何優化Java執行緒池?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除