首頁  >  文章  >  Java  >  Java中ThreadPoolExecutor執行緒池的用法介紹

Java中ThreadPoolExecutor執行緒池的用法介紹

不言
不言轉載
2019-04-04 09:55:363665瀏覽

這篇文章帶給大家的內容是關於Java中ThreadPoolExecutor線程池的用法介紹,有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

Executors

Executors 是一個Java中的工具類別. 提供工廠方法來建立不同類型的執行緒池.

Java中ThreadPoolExecutor執行緒池的用法介紹

從上圖也可以看出, Executors的創建線程池的方法, 創建出來的線程池都實現了 ExecutorService接口. 常用方法有以下幾個:

newFixedThreadPool(int Threads) : 創建固定數目線程的線程池, 超出的線程會在隊列中等待.

newCachedThreadPool(): 創建一個可緩存線程池, 如果線程池長度超過處理需要, 可靈活回收空閒線程(60秒), 若無可回收,則新建執行緒.

newSingleThreadExecutor(): 建立一個單執行緒化的執行緒池, 它只會用唯一的工作執行緒來執行任務, 保證所有任務按照指定順序( FIFO, LIFO, 優先權)執行. 如果某一個任務執行出錯, 將有另一個執行緒來繼續執行.

newScheduledThreadPool(int corePoolSize): 建立一個支援定時及週期性的任務執行的執行緒池, 多數情況下可用來替代Timer類別.

Executors 範例

newCachedThreadPool

線程最大數為 Integer.MAX_VALUE, 當我們在執行緒池中新增了n個任務, 這n 個任務都是一起執行的.

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newFixedThreadPool

        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newScheduledThreadPool

三秒執行一次, 只有執行完這一次後, 才會執行.

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

順序執行各個任務, 第一個任務執行完, 才會執行下一個.

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

Executors存在什麼問題

Java中ThreadPoolExecutor執行緒池的用法介紹

#在阿里巴巴Java開發手冊中提到,使用Executors創建線程池可能會導致OOM(OutOfMemory ,內存溢出),但是並沒有說明為什麼,那麼接下來我們就來看一下到底為什麼不允許使用Executors?

我們先來一個簡單的例子,模擬一下使用Executors導致OOM的情況.

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i <p>透過指定JVM參數:-Xmx8m -Xms8m 運行以上程式碼,會拋出OOM:</p><pre class="brush:php;toolbar:false">Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上程式碼指出,ExecutorsDemo.java 的第16行,就是程式碼中的 executor.execute(new SubThread());

##Java中的 BlockingQueue 主要有兩種實作, 分別是 ArrayBlockingQueue和 LinkedBlockingQueue.

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列, 必須設定容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity LinkedBlockingQueue 是一個用鍊錶實現的有界阻塞隊列, 容量可以選擇進行設定,不設定的話, 將是一個無邊界的阻塞隊列, 最大長度為 Integer.MAX_VALUE.<p></p><pre class="brush:php;toolbar:false">public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
這裡的問題就出在如果我們不設定 LinkedBlockingQueue 的容量的話, 其預設容量將會是 Integer. MAX_VALUE.

而 newFixedThreadPool 中創建 LinkedBlockingQueue 時, 並未指定容量. 此時, LinkedBlockingQueue 就是一個無邊界隊列, 對於一個無邊界隊列來說, 是可以不斷的向隊列中加入任務的, 這種情況下就有可能因為任務過多而導致內存溢出問題.

newCachedThreadPool 和 newScheduledThreadPool 這兩種方式創建的最大線程數可能是Integer.MAX_VALUE, 而創建這麼多線程, 必然就有可能導致OOM.

ThreadPoolExecutor 創建線程池

避免使用 Executors 創建線程池, 主要是避免使用其中的默認實現, 那麼我們可以自己直接調用 ThreadPoolExecutor 的構造函數來自己創建線程池. 在創建的同時, 給 BlockQueue 指定容量就可以了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));
這種情況下, 一旦提交的線程數超過當前可用線程數時, 就會拋出 java.util.concurrent.RejectedExecutionException,這是因為目前執行緒池使用的佇列是有邊界佇列, 佇列已經滿了便無法繼續處理新的請求.

#除了自己定義 ThreadPoolExecutor 外. 還有其他方法. 如apache和guava等.

四個建構函式

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)</runnable></runnable></runnable></runnable>
int corePoolSize => 該執行緒池中核心執行緒數最大值

執行緒池新執行緒的時候,如果目前執行緒總數小於corePoolSize, 則新建的是核心執行緒, 如果超過corePoolSize, 則新建的是非核心執行緒

核心執行緒預設會一直存活在執行緒池中, 即使這個核心執行緒啥也不乾(閒置狀態).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

【相关推荐:Java视频教程

以上是Java中ThreadPoolExecutor執行緒池的用法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除