Home >Java >javaTutorial >An article to help you understand the analysis and use of JDK thread pool
A very important function in multi-threaded programming is to execute tasks, and there are many ways to execute tasks. , why do we need to use a thread pool? Below we use the function of Socket programming to process requests and analyze each method of performing tasks.
1.1 Serial execution of tasks
When the Socket monitors that the client has a connection, each client connection is processed sequentially through the handleSocket method. When the processing is completed, continue to listen. The code is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); handleSocket(socket); }
The shortcomings of this method are very obvious: When I have multiple client requests, while the server is processing one request, other requests need to wait for the previous request to be processed. . This is almost unavailable in high concurrency situations.
1.2 Create a thread for each task
Optimize the above problem: create a thread for each client request to process the request, the main thread only needs to create the thread, and then you can Continue to support client requests. The flow chart is as follows:
The code is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start(); }
This method has the following advantages:
1. Separate the operation of processing client connections from the main thread so that the main loop can respond to the next request faster.
2. The operations of processing client connections are parallel, which improves the throughput of the program.
However, this method has the following disadvantages:
1. The thread processing the request must be thread-safe
2. The creation and destruction of threads require overhead , when a large number of threads are created, a large amount of computer resources will be consumed
3. When the number of available CPUs is less than the number of runnable threads, the extra threads will occupy memory resources and bring problems to garbage collection. Pressure, and there will be a lot of performance overhead when a large number of threads compete for CPU resources
4. There is an upper limit on the number of threads that can be created in the JVM. This upper limit varies with different platforms and is subject to many factors. Limitations of several factors, including JVM startup parameters, memory size occupied by each thread, etc. If these limits are exceeded, an OOM exception will be thrown.
1.3 Use the thread pool to handle client requests
For the problems that occurred in 1.2, the best solution is to use the thread pool to execute tasks, which can limit the total number of threads created. This avoids the problems in 1.2. The flow chart is as follows:
The processing method is as follows:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++)); }
This method has the following advantages:
1. Separate task submission and task execution Open
2. The thread that executes the task can be reused, which reduces the overhead of thread creation and destruction. At the same time, when the task arrives, the created thread can be directly used to execute the task, which also improves the response speed of the program.
The implementation of the thread pool in java is based on the producer-consumer model. The function of the thread pool combines the submission of tasks and the completion of tasks Execution separation, the process of task submission is the producer, and the process of executing the task is the consumer process. For specific analysis, see source code analysis. The top-level interface of the java thread pool is Executor, and the source code is as follows:
public interface Executor { void execute(Runnable command); }
This interface is the top-level interface implemented by all thread pools. It stipulates that the acceptable task type is the Runnable implementation class, but the specific execution logic of the task It is defined by the thread pool implementation class itself. For example:
You can use the main thread to execute tasks serially.
You can also create a new thread for each task
or create a group of threads in advance. Each time a task is executed, it is taken from a group of threads, etc.
The execution strategy of the thread pool mainly has the following aspects:
1. In what thread to execute Task
2. In what order should tasks be executed (FIFO, LIFO, priority?)
3. How many tasks can be executed concurrently
4. At most, how many tasks can be waiting for execution in the queue
5. How to reject newly submitted tasks when the waiting queue reaches the maximum value?
6. What operations need to be done before or after executing a task?
Different execution strategies should be selected according to specific business. The Executors tool class is provided in the Java class library to use the default strategy thread pool. There are mainly the following interfaces:
public static ExecutorService newFixedThreadPool(int nThreads) 将会创建一个固定大小的线程池,每当有新任务提交的时候,当线程总数没有达到核心线程数的时候,为每个任务创建一个新线程,当线程的个数到达最大值后,重用之前创建的线程,当线程因为未知异常而停止时候,将会重现创建一个线程作为补充。 public static ExecutorService newCachedThreadPool() 根据需求创建线程的个数,当线程数大于任务数的时候,将会注销多余的线程 public static ExecutorService newSingleThreadExecutor() 创建一个单线程的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个可执行定时任务的线程池
In the above example, the execution status of all submitted tasks is invisible after being submitted to the thread pool, that is, the main thread cannot know whether the submitted tasks have ended or not. Results of the. To address this problem, Java provides task interfaces Future and Callable interfaces that can return data.
The Callable interface provides the function of returning data from the task and throwing exceptions, and is defined as follows:
public interface Callable<V> { V call() throws Exception; }
All submit methods in ExecutorService will return a Future object, and its interface is defined as follows:
public interface Future<V> { 取消任务执行,当mayInterruptIfRunning为true,interruptedthisthread boolean cancel(boolean mayInterruptIfRunning); 返回此任务是否在执行完毕之前被取消执行 boolean isCancelled(); 返回此任务是否已经完成,包括正常结束,异常结束以及被cancel boolean isDone(); 返回执行结果,当任务没有执行结束的时候,等待 V get() throws InterruptedException, ExecutionException; }
public class ThreadDeadLock { private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws Exception { System.out.println("Main Thread start."); EXECUTOR_SERVICE.submit(new DeadLockThread()); System.out.println("Main Thread finished."); } private static class DeadLockThread extends Thread{ @Override public void run() { try { System.out.println("DeadLockThread start."); Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2()); future.get(); System.out.println("DeadLockThread finished."); } catch (Exception e) { } } } private static class DeadLockThread2 extends Thread { @Override public void run() { try { System.out.println("DeadLockThread2 start."); Thread.sleep(1000 * 10); System.out.println("DeadLockThread2 finished."); } catch (Exception e) { } } } }
Main Thread start. Main Thread finished. DeadLockThread start.
影响线程池大小的资源有很多,比如CPU、内存、数据库链接池等,只需要计算资源可用总资源 / 每个任务需要的资源,取最小值,即可得出线程池的上限。
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
public class ThreadPoolTest { private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)); public static void main(String[] args) throws Exception { for (int i = 0; i< 9; i++) { executorService.submit(new Task()); System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size()); } } private static class Task extends Thread { @Override public void run() { try { Thread.sleep(1000 * 100); } catch (Exception e) { } } } }
Active thread:1.Task count:1.TaskQueue size:0 Active thread:2.Task count:2.TaskQueue size:0 Active thread:3.Task count:3.TaskQueue size:0 Active thread:3.Task count:4.TaskQueue size:1 Active thread:3.Task count:5.TaskQueue size:2 Active thread:3.Task count:6.TaskQueue size:3 Active thread:4.Task count:7.TaskQueue size:3 Active thread:5.Task count:8.TaskQueue size:3 Active thread:6.Task count:9.TaskQueue size:3
ThreadPoolExecutor.AbortPolicy 抛出RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy 将任务的执行交给调用者,即将本该异步执行的任务变成同步执行。
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
The above is the detailed content of An article to help you understand the analysis and use of JDK thread pool. For more information, please follow other related articles on the PHP Chinese website!