Home  >  Article  >  Java  >  Java concurrent thread pool: detailed explanation of ThreadPoolExecutor

Java concurrent thread pool: detailed explanation of ThreadPoolExecutor

php是最好的语言
php是最好的语言Original
2018-08-08 11:23:141871browse

Summary: The characteristic of the thread pool is that after the number of threads = corePoolSize, only when the task queue is full, a task will be taken out from the task queue, and then a new thread will be constructed, and the cycle will continue until the thread The number reaches maximumPoolSize and the rejection policy is executed.

Thread Pool-intsmaze

The idea of ​​the thread pool is to open up an area in the system to store some standby threads. This area is called a thread pool. If there is a task that needs to be executed, a thread on standby is borrowed from the thread pool to execute the specified task, and the borrowed thread can be returned when the task is completed. This avoids repeatedly creating a large number of thread objects and wasting CPU and memory resources.

Custom thread pool-intsmaze

If you observe the source code implementation of various thread pools provided by jdk, you can find that, except for the new thread pool newWorkStealingPool added by jdk8, they are all based on The encapsulation implementation of ThreadPoolExecutor, so first explain the specific functions of ThreadPoolExecutor.

ThreadPoolExecutor detailed explanation-intsmaze

ThreadPoolExecutor( corePoolSize,  maximumPoolSize,  keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize: Specify the number of threads in the thread pool

maximumPoolSize: Maximum thread Quantity

keepAliveTime: When the number of threads exceeds corePoolSize, the survival time of the idle thread exceeds (after this time, the idle thread will be destroyed).

unit: time unit of keepAliveTime

workQueue: task queue, tasks submitted but not executed

threadFactory: Thread factory for creating threads, default is enough

handler: Rejection policy, how to reject tasks when there are too many tasks to handle, the default is new AbortPolicy() strategy.

        ExecutorService es = new ThreadPoolExecutor(3, 8, 60L,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),                new RejectedExecutionHandler() {                    public void rejectedExecution(Runnable r,
                            ThreadPoolExecutor executor) {
                        System.out.println("discard");
                    }
                });

Task queue--storing runnable objects-intsmaze

Summary: The characteristic of the thread pool is that the number of threads = corePoolSize, only when the task queue is full, a task will be taken out from the task queue, and then a new thread will be constructed, and the cycle will continue until the number of threads reaches maximumPoolSize to execute the rejection policy.

As long as the queue implements the BlockingQueue interface, note that the top-level queue interface implemented by ConcurrentLinkedQueue cannot be used here.

Commonly used ones are as follows:

SynchronousQueue:Submit the queue directly. The queue has no capacity. Each insertion operation must wait for a corresponding delete operation. On the contrary, each deletion operation must wait for the corresponding insertion operation. Therefore, it does not save tasks and always submits tasks to threads for execution. If there are no idle threads, a new thread is created. When the number of threads reaches the maximum, a rejection policy is executed.

ArrayBlockingQueue: Bounded task queue. If the number of threads in the thread pool is less than corePoolSize, new threads will be created. If it is greater than corePoolSize, new tasks will be added to the waiting queue. If the waiting queue is full, a new thread will be created to execute the task if the total thread is not greater than maximumPoolSize. If it is greater than maximumPoolSize, the rejection policy will be executed.

LinkedBlockingQueue: Unbounded queue, unless the system resources are exhausted, there will be no failure to enqueue the task. If the number of threads in the thread pool is less than corePoolSize, a new thread will be created; if it is greater than corePoolSize, a new task will be added to the waiting queue.

PriortyBlockingQueue: The priority task queue can control the order of task execution and is an unbounded queue. ArrayBlockingQueue and LinkedBlockingQueue all process tasks according to the first-in-first-out algorithm. PriorityBlockingQueue can be executed sequentially according to the priority of the task itself.

Rejection Strategy-intsmaze

The threads in the thread pool are used up, and the tasks in the waiting queue are full. No more new tasks can be filled, so a rejection strategy is needed: Process the task What to do when the quantity exceeds the actual capacity of the system.

jdk has four built-in rejection policies:

AbortPolicy: throw an exception directly (default policy), even if the thread pool is free, subsequent threads cannot run Yes, if you want subsequent threads to run, you need to capture exception information.

CallerRunsPolicy: This policy runs the currently discarded task directly in the caller thread. Obviously doing this will not actually discard the task, but the performance of the task submission thread will most likely drop sharply.

DiscardOldestPolicy: The oldest request, which is a task that is about to be executed, will be discarded and try to submit the current task again.

DiscardPolicy: Silently discard unprocessable tasks without any processing. This is probably the best solution if tasks are allowed to be lost. When the thread pool is not idle, the submitted tasks will be discarded, and the submitted tasks will be executed when there are idle threads.

The following is jdk's rejection policy source code-intsmaze

   public static class CallerRunsPolicy implements RejectedExecutionHandler {        public CallerRunsPolicy() { }        /**
         * 直接在调用者线程中运行当前被丢弃的任务         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                r.run();
            }
        }
    }    public static class AbortPolicy implements RejectedExecutionHandler {        public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }    public static class DiscardPolicy implements RejectedExecutionHandler {        public DiscardPolicy() { }        /**
         * Does nothing, which has the effect of discarding task r.         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }    public static class DiscardOldestPolicy implements RejectedExecutionHandler {        public DiscardOldestPolicy() { }        /**
         * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。

public class MyTask implements Runnable
{    private int number;    
    public MyTask(int number) {        super();        this.number = number;
    }    public void run() {
        System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    public static void main(String[] args)  {//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, 
//                new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());        
//        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,//                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());        
        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());        for(int i=0;i<10000;i++)
        {            try {
                System.out.println(i);
                es.submit(new MyTask(i));
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("------------------------"+i);
            }
        }
    }

线程池执行逻辑源码解析-intsmaze

      public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);        return ftask;
    }    
       /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null     */
    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn&#39;t, by returning false.
         *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 
         自动调用addWorker检查runState和workerCount,
         
         
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *如果任务可以成功排队,那么我们仍然需要
           仔细检查我们是否应该添加一个线程
          (因为现有的自从上次检查后死亡)或者那个
          自进入该方法以来,该池关闭。 所以我们
          重新检查状态,如果有必要的话回滚队列
          停止,或者如果没有的话就开始一个新的线程。
         
         
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.         */
        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))
                reject(command);//队列满了,执行拒绝策略
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        else if (!addWorker(command, false))
            reject(command);
    }    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法    }    
     /**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

jdk的线程池实现类-intsmaze

newFixedThreadPoo-intsmaze

任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。

ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor-intsmaze

任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。

ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool-intsmaze

任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。

ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());
}

newScheduledThreadPool- -定时线程-intsmaze

任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。 

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {        return new ScheduledThreadPoolExecutor(corePoolSize);
}public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());
}public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

newSingleThreadScheduledExecutor- -定时线程-intsmaze

相当于newScheduledThreadPool(int corePoolSize)corePoolSize设置为1。

ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();

延迟线程池

class MyScheduledTask implements Runnable
{ private String tname; public MyScheduledTask(String tname)
 {  this.tname=tname;
 } public void run()
 {
  System.out.println(tname+"任务时延2秒执行!!!");
 }
}public class intsmaze
{ public static void main(String[] args)
 {  ScheduledExecutorService scheduledThreadPool                       =Executors.newScheduledThreadPool(2);
  MyScheduledTask mt1=new MyScheduledTask("MT1");
  scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);
 }
}

 newWorkStealingPool java8新增连接池-intsmaze

    public static ExecutorService newWorkStealingPool(int parallelism) {        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争
    public static ExecutorService newWorkStealingPool() {        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
    }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。

关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze

希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。

用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

线程池优化-intsmaze

一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:

Ncpu=CPU的数量

Ucpu=目标CPU的使用率,0<=Ucpu<=1

W/C=等待时间与计算时间的比率

为保持处理器达到期望的使用率,最优的线程池的大小等于:

Nthreads=Ncpu*Ucpu*(1+W/C)

在java中,可以通过

Runtime.getRuntime().availableProcessors()

取得可以CPU数量。

相关推荐:

Java中线程池的图文代码详解

ThreadPoolExecutor线程池之submit方法

JAVA中ThreadPoolExecutor线程池的submit方法详解

The above is the detailed content of Java concurrent thread pool: detailed explanation of ThreadPoolExecutor. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn