首頁  >  文章  >  Java  >  一篇文章讓你看懂JDK線程池分析和使用

一篇文章讓你看懂JDK線程池分析和使用

无忌哥哥
无忌哥哥原創
2018-07-20 10:24:121332瀏覽

1.為什麼使用執行緒池

在多執行緒程式設計中一項很重要的功能就是執行任務,而執行任務的方式有很多種,為什麼一定需要使用執行緒池呢?以下我們使用Socket程式處理請求的功能,分別對每種執行任務的方式進行分析。

1.1序列執行任務

當Socket監聽到客戶端有連接,透過handleSocket方法順序的處理每一個客戶端連接,當處理完成後,繼續監聽。程式碼如下:

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        handleSocket(socket);
}

這種方式的缺點非常明顯:當我有多個客戶端請求時,在server處理一個請求的過程中,其他請求都需要等待前一個請求處理完畢。這種在高並發情況下幾乎不可用。

1.2為每個任務建立一個執行緒

針對上面的問題進行最佳化:為每個客戶端請求建立一個執行緒來處理請求,主執行緒只需要建立線程,之後即可繼續堅挺客戶端請求.流程圖如下:

程式碼如下:

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start();
}

這種方式有以下優點:

#1.將處理客戶端連線的操作從主執行緒中分離出去,使得主循環可以更快的回應下一次請求。
2.處理客戶端連線的操作是並行的,提高了程式的吞吐量。

但是這種方式有有以下幾個缺點:

1.處理請求的執行緒必須是執行緒安全的

2.執行緒的建立和銷毀都需要開銷,當大量創建線程的時候,將會消耗大量計​​算機資源

3.當可用的CPU數量小於可運行的線程的時候,那麼多出來的線程會佔用內存資源,給垃圾回收帶來壓力,並且在大量執行緒競爭CPU資源的時候會有很大的效能開銷

4.JVM中可建立的執行緒數存在一個上限,這個上限隨著平台的不同而不同,並且受多個因素的限制,包括JVM的啟動參數,每個執行緒所佔用的記憶體大小等,如果超出這些限制,將會拋出OOM異常。

1.3 使用執行緒池處理客戶端請求

對於1.2中出現的問題,最好的解決方案就是使用執行緒池來執行task,這樣可以對建立的執行緒總數做限制,從而避免1.2中的問題。流程圖如下:

處理方式如下:

ServerSocket serverSocket = new ServerSocket();
    SocketAddress endpoint = new InetSocketAddress(host, port);
    serverSocket.bind(endpoint,1023);
    while (!isStop) {
        Socket socket = serverSocket.accept();
        executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++));
}

此中方式有以下幾個優點:

1.任務提交與任務執行分離開

2.執行任務的執行緒可以重複使用,減少了執行緒建立和銷毀的開銷,同時當任務到達時可以直接使用建立好的執行緒執行任務,也提高了程式的回應速度。

2.java中線程池介紹

在java中線程池的實作是基於生產者-消費者模式的,線程池的功能將任務的提交和任務的執行分離,任務提交的過程為生產者,執行任務的過程為消費過程。具體的分析請見源碼分析。 java線程池的頂層接口為Executor,源碼如下:

public interface Executor {
    void execute(Runnable command);
}

此接口為所有線程池實現的頂層接口,其規定了可以接受的task類型為Runnable實現類,但是具體的執行task的邏輯由線程池實現類別自己定義,例如:

可以使用主線程串行執行任務,
也可以為每個任務創建一個新的線程
或提前創建好一組線程,每次執行任務的時候從一組執行緒中取,等等

對於執行緒池的執行策略主要有以下幾個面向:

##1.在什麼執行緒中執行任務2.依照什麼順序執行任務(FIFO、LIFO、優先權?)
3.有多少個任務可以並發執行
4.最多可以有多少個任務在佇列中等待執行
5.當等待佇列中達到最大值的時候,怎麼樣拒絕新提交的task
6.在執行一個任務之前或之後需要做哪些操作?

應該根據特定的業務選擇不同的執行策略。在java類別庫中提供了Executors工具類別來常見預設策略的執行緒池。主要有以下幾個介面:

public static ExecutorService newFixedThreadPool(int nThreads)
将会创建一个固定大小的线程池,每当有新任务提交的时候,当线程总数没有达到核心线程数的时候,为每个任务创建一个新线程,当线程的个数到达最大值后,重用之前创建的线程,当线程因为未知异常而停止时候,将会重现创建一个线程作为补充。

public static ExecutorService newCachedThreadPool()
根据需求创建线程的个数,当线程数大于任务数的时候,将会注销多余的线程

public static ExecutorService newSingleThreadExecutor()
创建一个单线程的线程池

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个可执行定时任务的线程池

在以上的例子中,所有提交的task在提交到執行緒池後其執行狀態是不可見的,即主執行緒無法知道提交的task是否執行結束或執行結果。針對這個問題,java提供了可以傳回資料的task介面Future和Callable介面。

其中Callable介面提供了任務返回資料以及拋出異常的功能,定義如下:

public interface Callable<V> {
  
    V call() throws Exception;
}

在ExecutorService中所有的submit方法都會傳回一個Future對象,其介面定義如下:

public interface Future<V> {

    取消任务执行,当mayInterruptIfRunning为true,interruptedthisthread
    boolean cancel(boolean mayInterruptIfRunning);
    返回此任务是否在执行完毕之前被取消执行
    boolean isCancelled();
    返回此任务是否已经完成,包括正常结束,异常结束以及被cancel
    boolean isDone();
    返回执行结果,当任务没有执行结束的时候,等待
    V get() throws InterruptedException, ExecutionException;
}

3.使用线程池可能出现的问题

1.线程饥饿死锁
在单线程的Executor中,如果Executor中执行的一个任务中,再次提交任务到同一个Executor中,并且等待这个任务执行完毕,那么就会发生死锁问题。如下demo中所示:

public class ThreadDeadLock {

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();


    public static void main(String[] args) throws Exception {
        System.out.println("Main Thread start.");
        EXECUTOR_SERVICE.submit(new DeadLockThread());
        System.out.println("Main Thread finished.");

    }

    private static class DeadLockThread extends Thread{

        @Override
        public void run() {
            try {
                System.out.println("DeadLockThread start.");
                Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2());
                future.get();
                System.out.println("DeadLockThread finished.");
            } catch (Exception e) {

            }
        }
    }

    private static class DeadLockThread2 extends Thread {

        @Override
        public void run() {
            try {
                System.out.println("DeadLockThread2 start.");
                Thread.sleep(1000 * 10);
                System.out.println("DeadLockThread2 finished.");
            } catch (Exception e) {

            }
        }
    }
}

输出结果为:

Main Thread start.
Main Thread finished.
DeadLockThread start.

对于多个线程的线程池,如果所有正在执行的线程都因为等待处于工作队列中的任务执行而阻塞,那么就会发生线程饥饿死锁。

当往线程池中提交有依赖的任务时,应清楚的知道可能会出现的线程饥饿死锁风险。==应考虑是否将依赖的task提交到不同的线程池中==
或者使用无界的线程池。

==只有当任务相对独立时,设置线程池大小和工作队列的大小才是合理的,否则有可能会出现线程饥饿死锁==

2.任务运行时间过长
任务执行时间过长会影响线程池的响应时间,当运行时间长的任务远大于线程池线程的个数时,会出现所有线程都在执行运行时间长的任务,从而影响对其他任务的响应。

解决办法:

1.通过限定任务等待的时长,而不要无限期等待下去,当等待超时的时候,可以将任务标记为失败,或者重新放到线程池中。

2.当线程池中阻塞任务过多的时,应该考虑扩大线程池的大小

4.线程池大小的设置

线程池的大小依赖于提交任务的类型以及服务器的可用资源,线程池的大小应该避免设置过大或者过小,当线程设置过打的时候可能会有资源耗尽的风险,线程池设置过小会有可用cpu空闲从而影响系统吞吐量。

影响线程池大小的资源有很多,比如CPU、内存、数据库链接池等,只需要计算资源可用总资源 / 每个任务需要的资源,取最小值,即可得出线程池的上限。
线程池的最小值应该大于可用的CPU数量。

4.java中常用线程池源码分析-ThreadPoolExecutor

ThreadPoolExecutor线程池是比较常用的一个线程池实现类,通过Executors工具类创建的线程池中,其具体实现类是ThreadPoolExecutor。首先我们可以看下ThreadPoolExecutor的构造函数如下:

public ThreadPoolExecutor(
    int corePoolSize,
   int maximumPoolSize,
   long keepAliveTime,
   TimeUnit unit,
   BlockingQueue<Runnable> workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler)

下面分别对构造函数中的各个参数对应的策略进行分析:

1.线程的创建与销毁

首先构造函数中corePoolSize、maximumPoolSize、keepAliveTime和unit参数影响线程的创建和销毁。其中corePoolSize为核心线程数,当第一次提交任务的时候如果正在执行的线程数小于corePoolSize,则新建一个线程执行task,如果已经超过corePoolSize,则将任务放到任务队列中等待执行。当任务队列的个数到达上限的时候,并且工作线程数量小于maximumPoolSize,则继续创建线程执行工作队列中的任务。当任务的个数小于maximumPoolSize的时候,将会把空闲的线程标记为可回收的垃圾线程。对于以下代码段测试此功能:

public class ThreadPoolTest {

    private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3));

    public static void main(String[] args) throws Exception {
        for (int i = 0; i< 9; i++) {
            executorService.submit(new Task());
            System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size());
        }
    }

    private static class Task extends Thread {

        @Override
        public void run() {
            try {
                Thread.sleep(1000 * 100);
            } catch (Exception e) {

            }
        }
    }

}

输出结果为:

Active thread:1.Task count:1.TaskQueue size:0
Active thread:2.Task count:2.TaskQueue size:0
Active thread:3.Task count:3.TaskQueue size:0
Active thread:3.Task count:4.TaskQueue size:1
Active thread:3.Task count:5.TaskQueue size:2
Active thread:3.Task count:6.TaskQueue size:3
Active thread:4.Task count:7.TaskQueue size:3
Active thread:5.Task count:8.TaskQueue size:3
Active thread:6.Task count:9.TaskQueue size:3

2.任务队列

在ThreadPoolExecutor的构造函数中可以传入保存任务的队列,当新提交的任务没有空闲线程执行时候,会将task保存到此队列中。保存的顺序是根据插入的顺序或者Comparator来排序的。

3.饱和策略

ThreadPoolExecutor.AbortPolicy
抛出RejectedExecutionException

ThreadPoolExecutor.CallerRunsPolicy
将任务的执行交给调用者,即将本该异步执行的任务变成同步执行。

4.线程工厂

当线程池需要创建线程的时候,默认是使用线程工厂方法来创建线程的,通常情况下我们通过指定线程工厂的方式来为线程命名,便于出现线程安全问题时候来定位问题。

5.线程池最佳实现

1.项目中所有的线程应该都有线程池来提供,不允许自行创建线程

2.尽量不要用Executors来创建线程,而是使用ThreadPoolExecutor来创建
Executors有以下问题:

1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

以上是一篇文章讓你看懂JDK線程池分析和使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn