ホームページ >Java >&#&チュートリアル >Java スレッド プールを最適化するにはどうすればよいですか?

Java スレッド プールを最適化するにはどうすればよいですか?

PHPz
PHPz転載
2023-05-09 19:31:05857ブラウズ

スレッド プールのアップグレード バージョンの最適化

1: 4 つの新しい拒否戦略が追加されました。 MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2: スレッド プール MyThreadPoolExecutor の構築方法を最適化し、ランダムなパラメータ送信を防ぐためにパラメータ検証を追加します。

3: これは最も重要な最適化です。

  • スレッドプールのスレッド予熱機能を削除します。スレッドの予熱は大量のメモリを消費するため、スレッド プールを使用していないときは常に実行されます。

  • 代わりに、execute メソッドが呼び出されてタスクを追加すると、ワーカー スレッド コレクションの現在のサイズが corePoolSize の値と比較され、new MyWorker() が使用されます。スレッドを作成し、スレッドに追加します。プール、これの利点は、スレッド プールを作成するときに、それが使用されない場合、現在のメモリに影響を与えないことです。使用されると、スレッドが作成されます。そして再利用のためにスレッドプールに入れられます。

スレッド プール コンストラクター

public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }

スレッド プール拒否戦略

戦略インターフェイス: MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

戦略内部実装クラス

/**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }

カプセル化拒否メソッド

protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

実行メソッド

@Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }

スレッド オブジェクトは、タスクがスレッド プールに配置された場合にのみ作成されることがわかります。

手書きのスレッド プール ソース コード

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**
 * 自定义线程池业务接口
 * @author 游政杰
 */
public interface MyExecutorService {

    boolean execute(Runnable runnable);

    void shutdown();

    void shutdownNow();

    boolean isShutdown();

    BlockingQueue<Runnable> getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**
 * 自定义拒绝异常
 */
public class MyRejectedExecutionException extends RuntimeException {

    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }

    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }

    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

コア クラス MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 纯手撸线程池框架
 * @author 游政杰
 */
public class MyThreadPoolExecutor implements MyExecutorService{

    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private static volatile int corePoolSize; //核心线程数
    private final HashSet workers; //工作线程
    private final BlockingQueue waitingQueue; //等待队列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //线程工厂
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略
    private volatile MyRejectedExecutionHandle handle; //拒绝紫略

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }
    /**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }
    //call拒绝方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        //执行任务
        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    //往线程池中放任务
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }
    //优雅的关闭
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }

    //判断线程池是否关闭
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }

    //获取等待队列
    @Override
    public BlockingQueue getWaitingQueue() {
        return this.waitingQueue;
    }
}

スレッド プール テスト クラス

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {


//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());


      for(int i=0;i<11;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();




  }
}

まあ、スレッド プールのアップグレード バージョンはここまで最適化されています。継続的な最適化のために、後で完全なバージョンがリリースされる可能性があります。

以上がJava スレッド プールを最適化するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。