ホームページ  >  記事  >  Java  >  Java 同時スレッド プール: ThreadPoolExecutor の詳細な説明

Java 同時スレッド プール: ThreadPoolExecutor の詳細な説明

php是最好的语言
php是最好的语言オリジナル
2018-08-08 11:23:141855ブラウズ

まとめ: スレッドプールの特徴は、スレッド数 = corePoolSize を超えた後、タスクキューがいっぱいになった場合にのみタスクキューからタスクを取り出し、その後新しいスレッドを構築し、このサイクルは、スレッド数が最大拒否戦略に達するまで継続されます。

スレッド プール-intsmaze

スレッド プールのアイデアは、いくつかのスタンバイ スレッドを保存するためにシステム内に領域を開くことです。この領域はスレッド プールと呼ばれます。実行が必要なタスクがある場合、スレッドプールから待機中のスレッドを借りて指定されたタスクを実行し、タスクの完了時に借りたスレッドを返却することができます。これにより、大量のスレッド オブジェクトを繰り返し作成したり、CPU リソースやメモリ リソースを浪費したりすることが回避されます。

カスタム スレッド プール-intsmaze

jdk が提供するさまざまなスレッド プールのソース コード実装を観察すると、jdk8 によって追加された新しいスレッド プール newWorkStealingPool を除いて、それらはすべて ThreadPoolExecutor のカプセル化実装に基づいていることがわかります。 , そのため、最初に ThreadPoolExecutor 固有の関数について説明します。

ThreadPoolExecutorの詳細説明 - intsmaze

ThreadPoolExecutor( corePoolSize,  maximumPoolSize,  keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize: スレッドプール内のスレッド数を指定します

maximumPoolSize: スレッドの最大数

keepAliveTime: スレッド数がcorePoolSizeを超えた場合の生存時間さらに多くのアイドル スレッド (これ以上) しばらくすると、アイドル スレッドは破棄されます)。

unit: keepAliveTime の時間単位

workQueue: タスクキュー、送信されたが実行されなかったタスク

threadFactory: スレッドを作成するためのスレッドファクトリー、デフォルトで十分です

handler: 処理すべきタスクが多すぎる場合の拒否ポリシー、デフォルトで新しい AbortPolicy() ポリシーを使用してタスクを拒否する方法。

        ExecutorService es = new ThreadPoolExecutor(3, 8, 60L,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),                new RejectedExecutionHandler() {                    public void rejectedExecution(Runnable r,
                            ThreadPoolExecutor executor) {
                        System.out.println("discard");
                    }
                });

タスクキュー--実行可能なオブジェクトの保存-intsmaze

概要: スレッドプールの特徴は、スレッド数 = corePoolSize を超えた後、タスクキューがいっぱいになった場合にのみスレッドが取り出されるということです。タスク キュー Task から新しいスレッドを構築し、スレッド数が maxPoolSize に達するまで繰り返し、拒否ポリシーを実行します。

キューが BlockingQueue インターフェースを実装している限り、ConcurrentLinkedQueue によって実装されたトップレベルのキュー インターフェースはここでは使用できないことに注意してください。

一般的に使用されるものは次のとおりです:

SynchronousQueue: キューを直接送信します。逆に、各挿入操作は対応する削除操作を待つ必要があります。挿入操作。したがって、タスクは保存されず、アイドル状態のスレッドがない場合は、常にタスクをスレッドに送信し、スレッドの数が最大に達すると、拒否ポリシーが実行されます。

ArrayBlockingQueue: 境界付きタスクキュー。スレッドプール内のスレッド数が corePoolSize より小さい場合、新しいスレッドが作成されます。corePoolSize より大きい場合、新しいタスクが待機キューに追加されます。待機キューがいっぱいの場合、スレッドの合計が maximumPoolSize を超えない場合は、タスクを実行するために新しいスレッドが作成され、maximumPoolSize より大きい場合は、拒否ポリシーが実行されます。

LinkedBlockingQueue: 無制限のキュー。システムリソースが使い果たされない限り、タスクのエンキューに失敗することはありません。スレッド プール内のスレッドの数が corePoolSize より小さい場合は、新しいスレッドが作成され、corePoolSize より大きい場合は、新しいタスクが待機キューに追加されます。

PriortyBlockingQueue: タスクの実行順序を制御できる優先タスクキューは、無制限のキューです。 ArrayBlockingQueue と LinkedBlockingQueue は両方とも、先入れ先出しアルゴリズムに従ってタスクを処理します。PriorityBlockingQueue は、タスク自体の優先度に従って順番に実行できます。

拒否戦略-intsmaze

スレッド プール内のスレッドが使い果たされ、待機キュー内のタスクがいっぱいになった場合、これ以上新しいタスクを埋めることができないため、拒否戦略が必要になります。処理タスクの数が超過した場合。システムの実際の容量、処理方法。

jdk には 4 つの組み込み拒否ポリシーがあります。

AbortPolicy: スレッド プールがアイドル状態であっても、後続のスレッドを実行できない場合は、キャプチャする必要があります。例外情報です。


CallerRunsPolicy: このポリシーは、現在破棄されているタスクを呼び出し側スレッドで直接実行します。明らかに、これを実行しても実際にタスクが破棄されるわけではありませんが、タスク送信スレッドのパフォーマンスは大幅に低下する可能性が高くなります。


DiscardOldestPolicy: 実行されようとしているタスクである最も古いリクエストを破棄し、現在のタスクを再度送信しようとします。


DiscardPolicy: 何も処理せずに処理できないタスクをサイレントに破棄します。タスクが失われることが許容される場合、これがおそらく最良の解決策です。スレッド プールがアイドル状態ではない場合、送信されたタスクは破棄され、アイドル状態のスレッドがあるときに送信されたタスクが実行されます。


以下はJDKの拒否ポリシーのソースコードです-intsmaze

   public static class CallerRunsPolicy implements RejectedExecutionHandler {        public CallerRunsPolicy() { }        /**
         * 直接在调用者线程中运行当前被丢弃的任务         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                r.run();
            }
        }
    }    public static class AbortPolicy implements RejectedExecutionHandler {        public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }    public static class DiscardPolicy implements RejectedExecutionHandler {        public DiscardPolicy() { }        /**
         * Does nothing, which has the effect of discarding task r.         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }    public static class DiscardOldestPolicy implements RejectedExecutionHandler {        public DiscardOldestPolicy() { }        /**
         * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。

public class MyTask implements Runnable
{    private int number;    
    public MyTask(int number) {        super();        this.number = number;
    }    public void run() {
        System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    public static void main(String[] args)  {//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, 
//                new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());        
        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());        for(int i=0;i<10000;i++)
        {            try {
                System.out.println(i);
                es.submit(new MyTask(i));
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("------------------------"+i);
            }
        }
    }

线程池执行逻辑源码解析-intsmaze

      public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);        return ftask;
    }    
       /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null     */
    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn&#39;t, by returning false.
         *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 
         自动调用addWorker检查runState和workerCount,
         
         
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *如果任务可以成功排队,那么我们仍然需要
           仔细检查我们是否应该添加一个线程
          (因为现有的自从上次检查后死亡)或者那个
          自进入该方法以来,该池关闭。 所以我们
          重新检查状态,如果有必要的话回滚队列
          停止,或者如果没有的话就开始一个新的线程。
         
         
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.         */
        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))
                reject(command);//队列满了,执行拒绝策略
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        else if (!addWorker(command, false))
            reject(command);
    }    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法    }    
     /**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

jdk的线程池实现类-intsmaze

newFixedThreadPoo-intsmaze

任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。

ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor-intsmaze

任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。

ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool-intsmaze

任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。

ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());
}

newScheduledThreadPool- -定时线程-intsmaze

任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);
}public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());
}public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

newSingleThreadScheduledExecutor- -定时线程-intsmaze

相当于newScheduledThreadPool(int corePoolSize)corePoolSize设置为1。

ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();

延迟线程池

class MyScheduledTask implements Runnable
{ private String tname; public MyScheduledTask(String tname)
 {  this.tname=tname;
 } public void run()
 {
  System.out.println(tname+"任务时延2秒执行!!!");
 }
}public class intsmaze
{ public static void main(String[] args)
 {  ScheduledExecutorService scheduledThreadPool                       =Executors.newScheduledThreadPool(2);
  MyScheduledTask mt1=new MyScheduledTask("MT1");
  scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);
 }
}

 newWorkStealingPool java8新增连接池-intsmaze

    public static ExecutorService newWorkStealingPool(int parallelism) {        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争
    public static ExecutorService newWorkStealingPool() {        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。

关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze

希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。

用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

线程池优化-intsmaze

一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:

Ncpu=CPU的数量

Ucpu=目标CPU的使用率,0<=Ucpu<=1

W/C=等待时间与计算时间的比率

为保持处理器达到期望的使用率,最优的线程池的大小等于:

Nthreads=Ncpu*Ucpu*(1+W/C)

在java中,可以通过

Runtime.getRuntime().availableProcessors()

取得可以CPU数量。

相关推荐:

Java中线程池的图文代码详解

ThreadPoolExecutor线程池之submit方法

JAVA中ThreadPoolExecutor线程池的submit方法详解

以上がJava 同時スレッド プール: ThreadPoolExecutor の詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。