A very important function in multi-threaded programming is to execute tasks, and there are many ways to execute tasks. , why do we need to use a thread pool? Below we use the function of Socket programming to process requests and analyze each method of performing tasks.
1.1 Serial execution of tasks
When the Socket monitors that the client has a connection, each client connection is processed sequentially through the handleSocket method. When the processing is completed, continue to listen. The code is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); handleSocket(socket); }
The shortcomings of this method are very obvious: When I have multiple client requests, while the server is processing one request, other requests need to wait for the previous request to be processed. . This is almost unavailable in high concurrency situations.
1.2 Create a thread for each task
Optimize the above problem: create a thread for each client request to process the request, the main thread only needs to create the thread, and then you can Continue to support client requests. The flow chart is as follows:
The code is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start(); }
This method has the following advantages:
1. Separate the operation of processing client connections from the main thread so that the main loop can respond to the next request faster.
2. The operations of processing client connections are parallel, which improves the throughput of the program.
However, this method has the following disadvantages:
1. The thread processing the request must be thread-safe
2. The creation and destruction of threads require overhead , when a large number of threads are created, a large amount of computer resources will be consumed
3. When the number of available CPUs is less than the number of runnable threads, the extra threads will occupy memory resources and bring problems to garbage collection. Pressure, and there will be a lot of performance overhead when a large number of threads compete for CPU resources
4. There is an upper limit on the number of threads that can be created in the JVM. This upper limit varies with different platforms and is subject to many factors. Limitations of several factors, including JVM startup parameters, memory size occupied by each thread, etc. If these limits are exceeded, an OOM exception will be thrown.
1.3 Use the thread pool to handle client requests
For the problems that occurred in 1.2, the best solution is to use the thread pool to execute tasks, which can limit the total number of threads created. This avoids the problems in 1.2. The flow chart is as follows:
The processing method is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++)); }
This method has the following advantages:
1. Separate task submission and task execution Open
2. The thread that executes the task can be reused, which reduces the overhead of thread creation and destruction. At the same time, when the task arrives, the created thread can be directly used to execute the task, which also improves the response speed of the program.
The implementation of the thread pool in java is based on the producer-consumer model. The function of the thread pool combines the submission of tasks and the completion of tasks Execution separation, the process of task submission is the producer, and the process of executing the task is the consumer process. For specific analysis, see source code analysis. The top-level interface of the java thread pool is Executor, and the source code is as follows:
public interface Executor { void execute(Runnable command); }
This interface is the top-level interface implemented by all thread pools. It stipulates that the acceptable task type is the Runnable implementation class, but the specific execution logic of the task It is defined by the thread pool implementation class itself. For example:
You can use the main thread to execute tasks serially.
You can also create a new thread for each task
or create a group of threads in advance. Each time a task is executed, it is taken from a group of threads, etc.
The execution strategy of the thread pool mainly has the following aspects:
1. In what thread to execute Task
2. In what order should tasks be executed (FIFO, LIFO, priority?)
3. How many tasks can be executed concurrently
4. At most, how many tasks can be waiting for execution in the queue
5. How to reject newly submitted tasks when the waiting queue reaches the maximum value?
6. What operations need to be done before or after executing a task?
Different execution strategies should be selected according to specific business. The Executors tool class is provided in the Java class library to use the default strategy thread pool. There are mainly the following interfaces:
public static ExecutorService newFixedThreadPool(int nThreads) 将会创建一个固定大小的线程池,每当有新任务提交的时候,当线程总数没有达到核心线程数的时候,为每个任务创建一个新线程,当线程的个数到达最大值后,重用之前创建的线程,当线程因为未知异常而停止时候,将会重现创建一个线程作为补充。 public static ExecutorService newCachedThreadPool() 根据需求创建线程的个数,当线程数大于任务数的时候,将会注销多余的线程 public static ExecutorService newSingleThreadExecutor() 创建一个单线程的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个可执行定时任务的线程池
In the above example, the execution status of all submitted tasks is invisible after being submitted to the thread pool, that is, the main thread cannot know whether the submitted tasks have ended or not. Results of the. To address this problem, Java provides task interfaces Future and Callable interfaces that can return data.
The Callable interface provides the function of returning data from the task and throwing exceptions, and is defined as follows:
public interface Callable<V> { V call() throws Exception; }
All submit methods in ExecutorService will return a Future object, and its interface is defined as follows:
public interface Future<V> { 取消任务执行,当mayInterruptIfRunning为true,interruptedthisthread boolean cancel(boolean mayInterruptIfRunning); 返回此任务是否在执行完毕之前被取消执行 boolean isCancelled(); 返回此任务是否已经完成,包括正常结束,异常结束以及被cancel boolean isDone(); 返回执行结果,当任务没有执行结束的时候,等待 V get() throws InterruptedException, ExecutionException; }
1.线程饥饿死锁
在单线程的Executor中,如果Executor中执行的一个任务中,再次提交任务到同一个Executor中,并且等待这个任务执行完毕,那么就会发生死锁问题。如下demo中所示:
public class ThreadDeadLock { private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws Exception { System.out.println("Main Thread start."); EXECUTOR_SERVICE.submit(new DeadLockThread()); System.out.println("Main Thread finished."); } private static class DeadLockThread extends Thread{ @Override public void run() { try { System.out.println("DeadLockThread start."); Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2()); future.get(); System.out.println("DeadLockThread finished."); } catch (Exception e) { } } } private static class DeadLockThread2 extends Thread { @Override public void run() { try { System.out.println("DeadLockThread2 start."); Thread.sleep(1000 * 10); System.out.println("DeadLockThread2 finished."); } catch (Exception e) { } } } }
输出结果为:
Main Thread start. Main Thread finished. DeadLockThread start.
对于多个线程的线程池,如果所有正在执行的线程都因为等待处于工作队列中的任务执行而阻塞,那么就会发生线程饥饿死锁。
当往线程池中提交有依赖的任务时,应清楚的知道可能会出现的线程饥饿死锁风险。==应考虑是否将依赖的task提交到不同的线程池中==
或者使用无界的线程池。
==只有当任务相对独立时,设置线程池大小和工作队列的大小才是合理的,否则有可能会出现线程饥饿死锁==
2.任务运行时间过长
任务执行时间过长会影响线程池的响应时间,当运行时间长的任务远大于线程池线程的个数时,会出现所有线程都在执行运行时间长的任务,从而影响对其他任务的响应。
解决办法:
1.通过限定任务等待的时长,而不要无限期等待下去,当等待超时的时候,可以将任务标记为失败,或者重新放到线程池中。
2.当线程池中阻塞任务过多的时,应该考虑扩大线程池的大小
线程池的大小依赖于提交任务的类型以及服务器的可用资源,线程池的大小应该避免设置过大或者过小,当线程设置过打的时候可能会有资源耗尽的风险,线程池设置过小会有可用cpu空闲从而影响系统吞吐量。
影响线程池大小的资源有很多,比如CPU、内存、数据库链接池等,只需要计算资源可用总资源 / 每个任务需要的资源,取最小值,即可得出线程池的上限。
线程池的最小值应该大于可用的CPU数量。
ThreadPoolExecutor线程池是比较常用的一个线程池实现类,通过Executors工具类创建的线程池中,其具体实现类是ThreadPoolExecutor。首先我们可以看下ThreadPoolExecutor的构造函数如下:
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面分别对构造函数中的各个参数对应的策略进行分析:
1.线程的创建与销毁
首先构造函数中corePoolSize、maximumPoolSize、keepAliveTime和unit参数影响线程的创建和销毁。其中corePoolSize为核心线程数,当第一次提交任务的时候如果正在执行的线程数小于corePoolSize,则新建一个线程执行task,如果已经超过corePoolSize,则将任务放到任务队列中等待执行。当任务队列的个数到达上限的时候,并且工作线程数量小于maximumPoolSize,则继续创建线程执行工作队列中的任务。当任务的个数小于maximumPoolSize的时候,将会把空闲的线程标记为可回收的垃圾线程。对于以下代码段测试此功能:
public class ThreadPoolTest { private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)); public static void main(String[] args) throws Exception { for (int i = 0; i< 9; i++) { executorService.submit(new Task()); System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size()); } } private static class Task extends Thread { @Override public void run() { try { Thread.sleep(1000 * 100); } catch (Exception e) { } } } }
输出结果为:
Active thread:1.Task count:1.TaskQueue size:0 Active thread:2.Task count:2.TaskQueue size:0 Active thread:3.Task count:3.TaskQueue size:0 Active thread:3.Task count:4.TaskQueue size:1 Active thread:3.Task count:5.TaskQueue size:2 Active thread:3.Task count:6.TaskQueue size:3 Active thread:4.Task count:7.TaskQueue size:3 Active thread:5.Task count:8.TaskQueue size:3 Active thread:6.Task count:9.TaskQueue size:3
2.任务队列
在ThreadPoolExecutor的构造函数中可以传入保存任务的队列,当新提交的任务没有空闲线程执行时候,会将task保存到此队列中。保存的顺序是根据插入的顺序或者Comparator来排序的。
3.饱和策略
ThreadPoolExecutor.AbortPolicy 抛出RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy 将任务的执行交给调用者,即将本该异步执行的任务变成同步执行。
4.线程工厂
当线程池需要创建线程的时候,默认是使用线程工厂方法来创建线程的,通常情况下我们通过指定线程工厂的方式来为线程命名,便于出现线程安全问题时候来定位问题。
1.项目中所有的线程应该都有线程池来提供,不允许自行创建线程
2.尽量不要用Executors来创建线程,而是使用ThreadPoolExecutor来创建
Executors有以下问题:
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
The above is the detailed content of An article to help you understand the analysis and use of JDK thread pool. For more information, please follow other related articles on the PHP Chinese website!