Home >Java >javaTutorial >ThreadPoolExecutor thread pool principle and its execute method
## For the thread pool, most people may You know how to use it and why you use it. It's just that tasks need to be executed asynchronously, and threads need to be managed uniformly. Regarding obtaining threads from the thread pool, most people may only know that if I need a thread to perform a task, then I will throw the task into the thread pool. If there are idle threads in the thread pool, they will be executed. If there are no idle threads, they will be executed. Just wait. In fact, the execution principle of the thread pool is far more than that simple. The thread pool class-ThreadPoolExecutor is provided in the Javaconcurrency package. In fact, more of us may use ExecutorsThe factory class is The thread pools we provide : newFixedThreadPool, newSingleThreadPool, newCachedThreadPool, these three thread pools are not ThreadPoolExecutor Subclasses of , regarding the relationship between these, let’s first look at ThreadPoolExecutor, check the source code and find that it has a total of 4 construction methods.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)First, let’s start with these parameters to understand the execution principle of the thread pool
corePoolSize: the number of threads in the core thread pool maximumPoolSize: the maximum number of thread pool threads keepAliveTime: thread activity retention time, after the worker thread of the thread pool is idle, keep Survival time. Unit: The unit of thread activity retention time. WorkQueue: The blocking queue used by the specified task queue corePoolSizeand maximumPoolSize are both in the specified thread pool The number of threads. It seems that when using thread pools, you only need to pass a thread pool size parameter at most to create a thread pool. Java provides us with some commonly used thread pool classes. It is the newFixedThreadPool, newSingleThreadExecutor, newCachedThreadPool mentioned above. Of course, if we want to create a custom thread pool ourselves, we must "Configure" some parameters about the thread pool yourself.
When a task is handed over to the thread pool for processing, the execution principle of the thread pool is as shown in the figure below. Refer to "JavaThe Art of Concurrent Programming"
First, it will determine whether there are threads in the core thread pool that can be executed. If there are idle threads, a thread will be created to perform the task. .
②When there are no threads available for execution in the core thread pool, the task is thrown into the task queue. ③If the task queue (bounded) is also full, but the number of running threads is less than the maximum number of thread pools, a new thread will be created to execute the task, but if the running When the number of threads has reached the maximum number of thread pools, threads cannot be created to perform tasks. So in fact, the thread pool does not just simply throw the task into the thread pool. If there are threads in the thread pool, the task will be executed, and if there are no threads, it will wait. In order to consolidate the principles of thread pools, let’s now learn about the three commonly used thread pools mentioned above:Executors.newFixedThreadPool
: Create a thread pool with a fixed number of threads.
// Executors#newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可以看到newFixedThreadPool中调用的是ThreadPoolExecutor类,传递的参数corePoolSize= maximumPoolSize=nThread。回顾线程池的执行原理,当一个任务提交到线程池中,首先判断核心线程池里有没有空闲线程,有则创建线程,没有则将任务放到任务队列(这里是有界阻塞队列LinkedBlockingQueue)中,如果任务队列已经满了的话,对于newFixedThreadPool来说,它的最大线程池数量=核心线程池数量,此时任务队列也满了,将不能扩展创建新的线程来执行任务。
//Executors# newSingleThreadExecutorpublic static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
//Executors#newCachedThreadPoolpublic static ExecutorService newCachedThreadPool() { return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核心线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注意SynchronousQueue是一个没有容量的队列,也就是说每个入队操作必须等待另一个线程的对应出队操作,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是一件好事,严重会耗尽CPU和内存资源。
题外话:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,这三者都直接或间接调用了ThreadPoolExecutor,为什么它们三者没有直接是其子类,而是通过Executors来实例化呢?这是所采用的静态工厂方法,在java.util.Connections接口中同样也是采用的静态工厂方法来创建相关的类。这样有很多好处,静态工厂方法是用来产生对象的,产生什么对象没关系,只要返回原返回类型或原返回类型的子类型都可以,降低API数目和使用难度,在《Effective Java》中的第1条就是静态工厂方法。
//Executor#executepublic interface Executor {void execute(Runnable command); }
//ThreadPoolExecutor#executepublic void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); … }
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));
//ThreadPoolExecutorprivate static final int COUNT_BITS = Integer.SIZE – 3; //32-3=29,线程数量所占位数private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大线程数,229-1//五种线程池状态private static final int RUNNING = -1 << COUNT_BITS; /int型变量高3位(含符号位)101表RUNINGprivate static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000private static final int STOP = 1 << COUNT_BITS; //高3位001private static final int TIDYING = 2 << COUNT_BITS; //高3位010private static final int TERMINATED = 3 << COUNT_BITS; //高3位011
1 //ThreadPoolExecutor#execute 2 public void execute(Runnable command) { 3 if (command == null) 4 throw new NullPointerException(); 5 int c = ctl.get(); //由它可以获取到当前有效的线程数和线程池的状态 6 /*1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中*/ 7 if (workerCountOf(c) < corePoolSize){ 8 if (addWorker(command, tre)) //在addWorker中创建工作线程执行任务 9 return ;10 c = ctl.get();11 }12 /*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/13 if (isRunning(c) && workQueue.offer(command)) { //线程池是否处于运行状态,且是否任务插入任务队列成功14 int recheck = ctl.get();15 if (!isRunning(recheck) && remove(command)) //线程池是否处于运行状态,如果不是则使刚刚的任务出队16 reject(command); //抛出RejectedExceptionException异常17 else if (workerCountOf(recheck) == 0)18 addWorker(null, false);19 }20 /*3.插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常*/21 else if (!addWorker(command, false)){22 reject(command); //抛出RejectedExceptionException异常23 }24 }
1 //ThreadPoolExecutor#addWorker 2 private boolean addWorker(Runnable firstTask, boolean core) { 3 /*首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环*/ 4 ... 5 if (compareAndIncrementWorkerCount(c)) 6 break retry; 7 ... 8 /*正在运行的线程数自增成功后则将线程封装成工作线程Worker*/ 9 boolean workerStarted = false;10 boolean workerAdded = false;11 Worker w = null;12 try {13 final ReentrantLock mainLock = this.mainLock; //全局锁14 w = new Woker(firstTask); //将线程封装为Worker工作线程15 final Thread t = w.thread;16 if (t != null) {17 mainLock.lock(); //获取全局锁18 /*当持有了全局锁的时候,还需要再次检查线程池的运行状态等*/19 try {20 int c = clt.get();21 int rs = runStateOf(c); //线程池运行状态22 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){ //线程池处于运行状态,或者线程池关闭且任务线程为空23 if (t.isAlive()) //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的24 throw new IllegalThreadStateException();25 workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含线程池中所有的工作线程,只有在获取了全局的时候才能访问它。将新构造的工作线程加入到工作线程集合中26 int s = worker.size(); //工作线程数量27 if (s > largestPoolSize)28 largestPoolSize = s;29 workerAdded = true; //新构造的工作线程加入成功30 }31 } finally {32 mainLock.unlock();33 }34 if (workerAdded) {35 t.start(); //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法36 workerStarted = true;37 }38 }39 } finally {40 if (!workerStarted) //未能成功创建执行工作线程41 addWorkerFailed(w); //在启动工作线程失败后,将工作线程从集合中移除42 }43 return workerStarted;44 }
//ThreadPoolExecutor$Worker,它继承了AQS,同时实现了Runnable,所以它具备了这两者的所有特性private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; public Worker(Runnable firstTask) { setState(-1); //设置AQS的同步状态为-1,禁止中断,直到调用runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); //通过线程工厂来创建一个线程,将自身作为Runnable传递传递 } public void run() { runWorker(this); //运行工作线程 } }
The above is the detailed content of ThreadPoolExecutor thread pool principle and its execute method. For more information, please follow other related articles on the PHP Chinese website!