Home >Java >javaTutorial >How to use the Java thread pool ThreadPoolExecutor class
It is pointed out in the "Alibaba Java Development Manual" that thread resources must be provided through the thread pool, and thread creation is not allowed to be displayed in the application. On the one hand, the creation of threads is more standardized and the number of opened threads can be reasonably controlled. ; On the other hand, the detailed management of threads is handed over to the thread pool, which optimizes resource overhead. The thread pool is not allowed to be created using Executors, but through ThreadPoolExecutor. This is because although the Executor framework in jdk provides methods such as newFixedThreadPool(), newSingleThreadExecutor(), newCachedThreadPool(), etc. to create thread pools, they all have Its limitations are not flexible enough; in addition, since the previous methods are also implemented internally through ThreadPoolExecutor, using ThreadPoolExecutor can help you clarify the operating rules of the thread pool, create a thread pool that meets the needs of your own business scenarios, and avoid the risk of resource exhaustion. .
Below we will give a detailed overview of how to use ThreadPoolExecutor.
First look at the constructor of ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
The meaning of the parameters of the constructor is as follows:
corePoolSize: specifies the number of threads in the thread pool, and its number determines the number of threads added Whether the task is to open a new thread to execute or put it in the workQueue task queue;
maximumPoolSize: specifies the maximum number of threads in the thread pool. This parameter will be based on the type of workQueue task queue you use. Determines the maximum number of threads that the thread pool will open;
keepAliveTime: When the number of idle threads in the thread pool exceeds corePoolSize, how long will it take for the excess threads to be destroyed;
unit:keepAliveTime Unit
workQueue: task queue, a task that has been added to the thread pool but has not yet been executed; it is generally divided into direct submission queue, bounded task queue, unbounded task queue, and priority task queue;
threadFactory: Thread factory, used to create threads, generally the default is enough;
handler: rejection strategy; how to reject tasks when there are too many tasks to process;
Next we will have a further understanding of the more important parameters:
We have introduced it above, it is generally divided into direct submission queue and bounded task queue , Unbounded task queue, priority task queue;
1. Direct submission queue: Set to SynchronousQueue queue. SynchronousQueue is a special BlockingQueue. It has no capacity. It will block without executing an insertion operation and needs to execute another one. The deletion operation will be awakened, and conversely each deletion operation will also have to wait for the corresponding insertion operation.
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<3;i++) { pool.execute(new ThreadTask()); } } } public class ThreadTask implements Runnable{ public ThreadTask() { public void run() { System.out.println(Thread.currentThread().getName());
The output result is
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent .RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)
It can be seen that when the task queue is SynchronousQueue and the number of created threads is greater than maximumPoolSize, it is executed directly Deny policy throws exception.
Using the SynchronousQueue queue, submitted tasks will not be saved and will always be submitted for execution immediately. If the number of threads used to perform tasks is less than maximumPoolSize, try to create a new process. If the maximum value set by maximumPoolSize is reached, the rejection policy is executed according to the handler you set. Therefore, the tasks you submit in this way will not be cached, but will be executed immediately. In this case, you need to have an accurate assessment of the concurrency of your program in order to set the appropriate maximumPoolSize amount, otherwise it will be very difficult. It is easy to execute the rejection policy;
2. Bounded task queue: The bounded task queue can be implemented using ArrayBlockingQueue, as shown below
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
Use ArrayBlockingQueue bounded task queue, if there is When a new task needs to be executed, the thread pool will create a new thread. When the number of created threads reaches corePoolSize, the new task will be added to the waiting queue. If the waiting queue is full, that is, it exceeds the initial capacity of ArrayBlockingQueue, continue to create threads until the number of threads reaches the maximum number of threads set by maximumPoolSize. If it is greater than maximumPoolSize, the rejection policy will be executed. In this case, the upper limit of the number of threads is directly related to the status of the bounded task queue. If the initial capacity of the bounded queue is large or has not reached an overloaded state, the number of threads will always be maintained below corePoolSize. Otherwise, when the task queue When it is full, maximumPoolSize will be used as the upper limit of the maximum number of threads.
3. Unbounded task queue: Bounded task queue can be implemented using LinkedBlockingQueue, as shown below
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //优先任务队列 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<20;i++) { pool.execute(new ThreadTask(i)); } } } public class ThreadTask implements Runnable,Comparable<ThreadTask>{ private int priority; public int getPriority() { return priority; public void setPriority(int priority) { this.priority = priority; public ThreadTask() { public ThreadTask(int priority) { //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高 public int compareTo(ThreadTask o) { return this.priority>o.priority?-1:1; public void run() { try { //让线程阻塞,使后续任务进入缓存队列 Thread.sleep(1000); System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
我们来看下执行的结果情况
priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1
大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。
通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:
1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //自定义拒绝策略 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+"执行了拒绝策略"); } }); for(int i=0;i<10;i++) { pool.execute(new ThreadTask()); } } } public class ThreadTask implements Runnable{ public void run() { try { //让线程阻塞,使后续任务进入缓存队列 Thread.sleep(1000); System.out.println("ThreadName:"+Thread.currentThread().getName()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
输出结果:
com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;
线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //自定义线程工厂 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); //线程命名 Thread th = new Thread(r,"threadPool"+r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()); for(int i=0;i<10;i++) { pool.execute(new ThreadTask()); } } } public class ThreadTask implements Runnable{ public void run() { //输出执行线程的名称 System.out.println("ThreadName:"+Thread.currentThread().getName());
我们看下输出结果
线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170
可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。
ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,
1、beforeExecute:线程池中任务运行前执行
2、afterExecute:线程池中任务运行完毕后执行
3、terminated:线程池退出后执行
通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下
public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) throws InterruptedException { //实现自定义接口 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); //线程命名 Thread th = new Thread(r,"threadPool"+r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()) { protected void beforeExecute(Thread t,Runnable r) { System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName()); } protected void afterExecute(Runnable r,Throwable t) { System.out.println("执行完毕:"+((ThreadTask)r).getTaskName()); } protected void terminated() { System.out.println("线程池退出"); } }; for(int i=0;i<10;i++) { pool.execute(new ThreadTask("Task"+i)); } pool.shutdown(); } } public class ThreadTask implements Runnable{ private String taskName; public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public ThreadTask(String name) { this.setTaskName(name); } public void run() { //输出执行线程的名称 System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName()); } }
我看下输出结果
线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0---ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1---ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出
可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可
/** * Nthreads=CPU数量 * Ucpu=目标CPU的使用率,0<=Ucpu<=1 * W/C=任务等待时间与任务计算时间的比率 */ Nthreads = Ncpu*Ucpu*(1+W/C)
The above is the detailed content of How to use the Java thread pool ThreadPoolExecutor class. For more information, please follow other related articles on the PHP Chinese website!