ホームページ >Java >&#&チュートリアル >Java スレッド プール ThreadPoolExecutor クラスの使用方法

Java スレッド プール ThreadPoolExecutor クラスの使用方法

王林
王林転載
2023-04-26 13:31:151514ブラウズ

「Alibaba Java 開発マニュアル」では、スレッド リソースはスレッド プールを通じて提供する必要があり、スレッドの作成をアプリケーションに表示することは許可されていないと指摘されています。 ; 一方で、スレッドの詳細な管理はスレッド プールに引き継がれ、リソースのオーバーヘッドが最適化されます。スレッド プールは、Executor を使用して作成することはできませんが、ThreadPoolExecutor を通じて作成することができます。これは、jdk の Executor フレームワークには、スレッド プールを作成するための newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool() などのメソッドが用意されているためです。その制限には十分な柔軟性がありません。また、以前のメソッドも ThreadPoolExecutor を通じて内部的に実装されているため、ThreadPoolExecutor を使用すると、スレッド プールの運用ルールを明確にし、独自のビジネス シナリオのニーズを満たすスレッド プールを作成するのに役立ちます。リソース枯渇のリスクを回避します。

以下では、ThreadPoolExecutor の使用方法の詳細な概要を説明します。

まず ThreadPoolExecutor のコンストラクターを見てください

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

コンストラクターのパラメーターの意味は次のとおりです:

corePoolSize: スレッド プール内のスレッドの数を指定します。タスクが新しいスレッドを開いて実行するか、workQueue タスク キューに入れるか;

maximumPoolSize: スレッド プール内のスレッドの最大数を指定します。このパラメータは、スレッド プール内のスレッドの最大数を指定します。使用する workQueue タスク キューのタイプに基づきます。スレッド プールが開くスレッドの最大数を決定します。

keepAliveTime: スレッド プール内のアイドル スレッドの数が corePoolSize を超えた場合、どれくらいの時間がかかりますか余分なスレッドが破棄されるまでに時間がかかります;

unit:keepAliveTime Unit

workQueue: タスクキュー、スレッドプールに追加されているがまだ実行されていないタスク。通常は、直接送信キュー、制限付きタスク キュー、制限なしタスク キュー、および優先タスク キューに分けられます。

threadFactory: スレッドの作成に使用されるスレッド ファクトリ。通常はデフォルトで十分です。

handler:拒否戦略; 処理すべきタスクが多すぎる場合にタスクを拒否する方法;

次に、より重要なパラメータについてさらに理解します:

1. workQueue タスク キュー

上で紹介しましたが、一般的には直接送信キューと制限付きタスクキュー、制限なしタスクキュー、優先タスクキューに分かれています;

1. 直接送信キュー: SynchronousQueueキューに設定します。 BlockingQueue。容量がありません。挿入操作を実行せずにブロックされるため、別の操作を実行する必要があります。削除操作が起動され、逆に各削除操作も対応する挿入操作を待つ必要があります。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    public void run() {
        System.out.println(Thread.currentThread().getName());

出力結果は、

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util です。 concurrent .RejectedExecutionException: タスク com.hhxx.test.ThreadTask@55f96302 が java.util.concurrent.ThreadPoolExecutor@3d4eac69 から拒否されました [実行中、プール サイズ = 2、アクティブなスレッド = 0、キューに入れられたタスク = 0、完了したタスク = 2]
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ソース不明)
java.util.concurrent.ThreadPoolExecutor.reject(ソース不明)
java.util.concurrent.ThreadPoolExecutor.execute(ソース不明) )
at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

タスクキューがSynchronousQueueで、作成されたスレッドの数がminimumPoolSize の場合、直接実行されます。拒否ポリシーは例外をスローします。

SynchronousQueue キューを使用すると、送信されたタスクは保存されず、常にすぐに実行のために送信されます。タスクの実行に使用されるスレッドの数がmaximumPoolSize未満の場合は、新しいプロセスの作成を試行します。maximumPoolSizeで設定された最大値に達すると、設定したハンドラに従って拒否ポリシーが実行されます。したがって、この方法で送信したタスクはキャッシュされませんが、すぐに実行されます。この場合、適切な MaximumPoolSize 量を設定するには、プログラムの同時実行性を正確に評価する必要があります。拒否ポリシーを実行するのは簡単です;

2. バインド タスク キュー: 以下に示すように、バインド タスク キューは ArrayBlockingQueue を使用して実装できます。

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

ArrayBlockingQueue バインド タスク キューを使用します。新しいタスクを実行する必要がある場合、スレッド プールは新しいスレッドを作成し、作成されたスレッドの数が corePoolSize に達すると、新しいタスクは待機キューに追加されます。待機キューがいっぱいの場合、つまり ArrayBlockingQueue の初期容量を超えた場合は、スレッド数が maximumPoolSize で設定された最大スレッド数に達するまでスレッドの作成を続けます。maximumPoolSize より大きい場合、拒否ポリシーが実行されます。 。この場合、スレッド数の上限はバインド・タスク・キューの状態に直接関係し、バインド・キューの初期容量が大きいか、過負荷状態に達していない場合には、スレッド数は常に維持されます。それ以外の場合、タスクキューがいっぱいの場合、maximumPoolSize が最大スレッド数の上限として使用されます。

3. 境界のないタスク キュー: 境界のあるタスク キューは、以下に示すように、LinkedBlockingQueue を使用して実装できます。

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //优先任务队列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    public int getPriority() {
        return priority;
    public void setPriority(int priority) {
        this.priority = priority;
    public ThreadTask() {
        
    public ThreadTask(int priority) {
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

我们来看下执行的结果情况

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

二、拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;

3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;

4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义拒绝策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"执行了拒绝策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

输出结果:

com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

三、ThreadFactory自定义线程创建

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义线程工厂
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //输出执行线程的名称
        System.out.println("ThreadName:"+Thread.currentThread().getName());

我们看下输出结果

线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

四、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //实现自定义接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //输出执行线程的名称
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下输出结果

线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0---ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1---ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出

可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

五、线程池线程数量

线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

/**
             * Nthreads=CPU数量
             * Ucpu=目标CPU的使用率,0<=Ucpu<=1
             * W/C=任务等待时间与任务计算时间的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

以上がJava スレッド プール ThreadPoolExecutor クラスの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。