Home >Java >javaTutorial >Analysis of the creation process of Java thread pool
Recently, I have been improving the concurrency function of the project, but the development has been bumpy. After reading a lot of information, I finally deepened my understanding. So I planned to check the source code together and summarize the principles of concurrent programming.
Be prepared to start with the most used thread pool and understand the implementation principles of the entire life cycle of the thread pool around creation, execution, and shutdown. Later, we will study topics such as atomic variables, concurrent containers, blocking queues, synchronization tools, locks, etc. The concurrency tools in java.util.concurrent are not difficult to use, but you can’t just use them, we have to read the fucking source code, haha. By the way, the JDK I use is 1.8.
Executor is a thread pool management framework. There is only one method execute in the interface, which executes Runnable tasks. The ExecutorService interface extends Executor, adds thread life cycle management, and provides methods such as task termination and task result return. AbstractExecutorService implements ExecutorService and provides default implementation logic such as submit method.
Then today’s topic, ThreadPoolExecutor, inherits AbstractExecutorService and provides the specific implementation of the thread pool.
The following is the most common constructor of ThreadPoolExecutor, with up to seven parameters. I won’t post the specific code, just some statements for parameter verification and setting.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize is the target size of the thread pool, which is the size when the thread pool has just been created and there are no tasks to be executed. maximumPoolSize is the maximum upper limit of the thread pool. keepAliveTime is the survival time of the thread. When the number of threads in the thread pool is greater than corePoolSize, idle threads that exceed the survival time will be recycled. Needless to say unit, the remaining three parameters will be analyzed later.
ThreadPoolExecutor presets some customized thread pools, created by the factory method in Executors. Let's analyze the creation parameters of newSingleThreadExecutor, newFixedThreadPool, and newCachedThreadPool.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
The corePoolSize and maximumPoolSize of newFixedThreadPool are set to the incoming fixed number, and keepAliveTim is set to 0. After the thread pool is created, the number of threads will be fixed, which is suitable for situations where thread stability is required.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor is a newFixedThreadPool version with a fixed number of threads of 1, ensuring the serialization of tasks in the pool. Notice that FinalizableDelegatedExecutorService is returned. Let’s take a look at the source code:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService inherits DelegatedExecutorService and only adds the operation of closing the thread pool during gc. Let’s take a look at the source code of DelegatedExecutorService:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
The code is very simple. DelegatedExecutorService wraps ExecutorService so that it only exposes the methods of ExecutorService, so the parameters of the thread pool can no longer be configured. Originally, the parameters created by the thread pool can be adjusted, and ThreadPoolExecutor provides the set method. The purpose of using newSingleThreadExecutor is to generate a single-thread serial thread pool. It would be boring if the thread pool size could also be configured.
Executors also provides the unconfigurableExecutorService method, which wraps the ordinary thread pool into a non-configurable thread pool. If you don't want the thread pool to be modified by unknown future generations, you can call this method.
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool generates a cached thread pool. The number of threads can be from 0 to Integer.MAX_VALUE, and the timeout is 1 minute. . The effect of using the thread pool is: if there is an idle thread, the thread will be reused; if there is no idle thread, a new thread will be created; if the thread is idle for more than 1 minute, it will be recycled.
newScheduledThreadPool
newScheduledThreadPool will create a thread pool that can execute tasks regularly. This is not planned to be discussed in this article, and will be discussed in detail in a separate article later.
newCachedThreadPool的线程上限几乎等同于无限,但系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。
JDK为BlockingQueue提供了几种实现方式,常用的有:
ArrayBlockingQueue:数组结构的阻塞队列
LinkedBlockingQueue:链表结构的阻塞队列
PriorityBlockingQueue:有优先级的阻塞队列
SynchronousQueue:不会存储元素的阻塞队列
newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
欢迎留言和转发,下一篇打算分析线程池如何执行任务。
以上就是Java 线程池的创建过程分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!