Java Thread Pool
I have recently been improving the concurrency function of the project, but the development has been bumpy. After reading a lot of information, I finally deepened my understanding. So I planned to check the source code together and summarize the principles of concurrent programming.
Be prepared to start with the most used thread pool and understand the implementation principles of the entire life cycle of the thread pool around creation, execution, and shutdown. Later, we will study topics such as atomic variables, concurrent containers, blocking queues, synchronization tools, locks, etc. The concurrency tools in java.util.concurrent are not difficult to use, but you can’t just use them, we have to read the fucking source code, haha. By the way, the JDK I use is 1.8.
Executor framework
Executor is a thread pool management framework. There is only one method execute in the interface, which executes Runnable tasks. The ExecutorService interface extends Executor, adds thread life cycle management, and provides methods such as task termination and task result return. AbstractExecutorService implements ExecutorService and provides default implementation logic such as submit method.
Then today’s topic, ThreadPoolExecutor, inherits AbstractExecutorService and provides the specific implementation of the thread pool.
Construction method
The following is the most common constructor of ThreadPoolExecutor, with up to seven parameters. I won’t post the specific code, just some statements for parameter verification and setting.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize is the target size of the thread pool, which is the size when the thread pool has just been created and there are no tasks to be executed. maximumPoolSize is the maximum upper limit of the thread pool. keepAliveTime is the survival time of the thread. When the number of threads in the thread pool is greater than corePoolSize, idle threads that exceed the survival time will be recycled. Needless to say unit, the remaining three parameters will be analyzed later.
Default customized thread pool
ThreadPoolExecutor presets some customized thread pools, created by the factory method in Executors. Let's analyze the creation parameters of newSingleThreadExecutor, newFixedThreadPool, and newCachedThreadPool.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
The corePoolSize and maximumPoolSize of newFixedThreadPool are both set to the fixed number passed in, and keepAliveTim is set to 0. After the thread pool is created, the number of threads will be fixed, which is suitable for situations where thread stability is required.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor is a version of newFixedThreadPool with a fixed number of threads of 1, ensuring the serialization of tasks in the pool. Notice that FinalizableDelegatedExecutorService is returned. Let’s take a look at the source code:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService inherits DelegatedExecutorService and only adds the operation of closing the thread pool during gc. Let’s take a look at the source code of DelegatedExecutorService:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
Code Very simply, DelegatedExecutorService wraps ExecutorService so that it only exposes the methods of ExecutorService, so the parameters of the thread pool can no longer be configured. Originally, the parameters created by the thread pool can be adjusted, and ThreadPoolExecutor provides the set method. The purpose of using newSingleThreadExecutor is to generate a single-thread serial thread pool. It would be boring if the thread pool size could also be configured.
Executors also provides the unconfigurableExecutorService method, which wraps the ordinary thread pool into a non-configurable thread pool. If you don't want the thread pool to be modified by unknown future generations, you can call this method.
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool generates a cached thread pool. The number of threads can range from 0 to Integer.MAX_VALUE, and the timeout is 1 minute. The effect of using the thread pool is: if there is an idle thread, the thread will be reused; if there is no idle thread, a new thread will be created; if the thread is idle for more than 1 minute, it will be recycled.
newScheduledThreadPool
newScheduledThreadPool will create a thread pool that can execute tasks regularly. This is not planned to be discussed in this article, and will be discussed in detail in a separate article later.
Waiting Queue
The thread limit of newCachedThreadPool is almost equal to unlimited, but system resources are limited, and the processing speed of the task may not be as fast as the submission speed of the task. Therefore, a blocking queue can be provided for ThreadPoolExecutor to save Runnable tasks waiting due to insufficient threads. This is BlockingQueue.
JDK provides several implementation methods for BlockingQueue. Commonly used ones are:
ArrayBlockingQueue: blocking queue of array structure
LinkedBlockingQueue: blocking queue of linked list structure
PriorityBlockingQueue: Prioritized blocking queue
SynchronousQueue: Blocking queue that does not store elements
newFixedThreadPool and newSingleThreadExecutor use an unbounded LinkedBlockingQueue by default. It should be noted that if tasks are submitted all the time, but the thread pool cannot process them in time, the waiting queue will lengthen indefinitely, and there will always be a moment when system resources are exhausted. Therefore, it is recommended to use a bounded waiting queue to avoid resource exhaustion. But solving one problem will bring about new problems: after the queue is filled, and new tasks come, what should we do at this time? How to deal with queue saturation will be introduced later.
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
饱和策略
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
ThreadFactory
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
更多Java 线程池详解及创建简单实例相关文章请关注PHP中文网!