首頁  >  文章  >  Java  >  Java 執行緒池詳解及實例程式碼

Java 執行緒池詳解及實例程式碼

高洛峰
高洛峰原創
2017-01-23 16:21:381558瀏覽

執行緒池的技術背景

在物件導向程式設計中,建立和銷毀物件是很費時的,因為建立一個物件要取得記憶體資源或其它更多資源。在Java中更是如此,虛擬機器將試圖追蹤每個對象,以便能夠在對象銷毀後進行垃圾回收。

所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些」池化資源」技術產生的原因。

例如Android中常見到的很多通用組件一般都離不開」池」的概念,如各種圖片加載庫,網絡請求庫,即使Android的消息傳遞機制中的Meaasge當使用Meaasge.obtain()就是使用的Meaasge池中的對象,因此這個概念很重要。本文將介紹的線程池技術同樣符合這一想法。

執行緒池的優點:

1.重用執行緒池中的執行緒,減少因物件建立,銷毀所帶來的效能開銷;

2.能有效的控制執行緒的最大並發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞;

3.能夠多執行緒進行簡單的管理,使執行緒的使用簡單、有效率。

執行緒池框架Executor

java中的執行緒池是透過Executor框架實現的,Executor 框架包括類別:Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable和Future、FutureTask的使用等。

Java 线程池详解及实例代码

Executor: 所有執行緒池的介面,只有一個方法。

public interface Executor { 
 void execute(Runnable command); 
}

ExecutorService: 增加Executor的行為,是Executor實現類別的最直接介面。

Executors: 提供了一系列工廠方法用於創先線程池,返回的線程池都實作了ExecutorService 介面。

ThreadPoolExecutor:線程池的具體實作類別,一般用的各種執行緒池都是基於這個類別實現的。 建構方法如下:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
 
Executors.defaultThreadFactory(), defaultHandler);
 
}

corePoolSize:在執行緒池的核心執行緒數,執行緒池中執行的執行緒數也永遠不會超過 corePoolSize 個,預設可以一直存活。可以透過設定allowCoreThreadTimeOut為True,此時 核心執行緒數就是0,此時keepAliveTime控制所有執行緒的逾時時間。

maximumPoolSize:執行緒池允許的最大執行緒數;

keepAliveTime: 指的是空閒執行緒結束的超時時間;

unit :是一個枚舉,表示keepAliveTime 的單位; Runnable佇列。

BlockingQueue:阻塞佇列(BlockingQueue)是java.util.concurrent下的主要用來控制執行緒同步的工具。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間才會被喚醒繼續操作。 阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裡添加元素的線程,消費者是從隊列裡拿元素的線程。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。具體的實作類別有LinkedBlockingQueue,ArrayBlockingQueued等。一般其內部的都是透過Lock和Condition(顯示鎖定(Lock)及Condition的學習與使用)來實現阻塞和喚醒。

執行緒池的工作過程如下:

執行緒池剛建立時,裡面沒有一個執行緒。任務隊列是作為參數傳進來的。不過,就算隊列裡面有任務,執行緒池也不會馬上執行它們。

當呼叫execute() 方法新增一個任務時,執行緒池會做以下判斷:

如果正在執行的執行緒數量小於corePoolSize,那麼馬上建立執行緒執行這個任務;

如果正在執行的執行緒數量大於或等於corePoolSize,那麼將這個任務放入隊列;

如果這時候隊列滿了,而且正在運行的線程數量小於maximumPoolSize,那麼還是要創建非核心線程立刻運行這個任務;

如果隊列滿了,而且正在運行的執行緒數量大於或等於maximumPoolSize,那麼執行緒池就會拋出異常RejectExecutionException。

當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。

當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果目前執行的執行緒數大於 corePoolSize,那麼這個執行緒就會被停掉。所以在線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

執行緒池的建立和使用


產生執行緒池採用了工具類別Executors的靜態方法,以下是幾個常見的執行緒池。

SingleThreadExecutor:單一後台執行緒 (其緩衝佇列是無界的)

public static ExecutorService newSingleThreadExecutor() { 
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,        
  0L, TimeUnit.MILLISECONDS,        
  new LinkedBlockingQueue<Runnable>()));
}

创建一个单线程的线程池。这个线程池只有一个核心线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

FixedThreadPool:只有核心线程的线程池,大小固定 (其缓冲队列是无界的) 。

public static ExecutorService newFixedThreadPool(int nThreads) {         
        return new ThreadPoolExecutor(nThreads, nThreads,                                       
            0L, TimeUnit.MILLISECONDS,                                         
            new LinkedBlockingQueuedc2a42176698ed27c08b14a3979e5f11());     
}
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

CachedThreadPool:无界线程池,可以进行自动线程回收。

public static ExecutorService newCachedThreadPool() {  
 return new ThreadPoolExecutor(0,Integer.MAX_VALUE,          
   60L, TimeUnit.SECONDS,         
   new SynchronousQueue<Runnable>()); 
}

如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。SynchronousQueue是一个是缓冲区为1的阻塞队列。

ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {  
 return new ScheduledThreadPool(corePoolSize,
    Integer.MAX_VALUE,            
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,            
    new DelayedWorkQueue());
}

创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。

线程池最常用的提交任务的方法有两种:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask8742468051c85b06f0a0af9e3e506b5c task = ExecutorService.submit(Runnable runnable,T Result);

FutureTask8742468051c85b06f0a0af9e3e506b5c task = ExecutorService.submit(Callable8742468051c85b06f0a0af9e3e506b5c callable);

submit(Callable callable)的实现,submit(Runnable runnable)同理。

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

可以看出submit开启的是有返回结果的任务,会返回一个FutureTask对象,这样就能通过get()方法得到结果。submit最终调用的也是execute(Runnable runable),submit只是将Callable对象或Runnable封装成一个FutureTask对象,因为FutureTask是个Runnable,所以可以在execute中执行。关于Callable对象和Runnable怎么封装成FutureTask对象,见Callable和Future、FutureTask的使用。

线程池实现的原理

如果只讲线程池的使用,那这篇博客没有什么大的价值,充其量也就是熟悉Executor相关API的过程。线程池的实现过程没有用到Synchronized关键字,用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类,FutureTask等等,因为后者的性能更优。理解的过程可以很好的学习源码中并发控制的思想。

在开篇提到过线程池的优点是可总结为以下三点:

线程复用

控制最大并发数

管理线程

1.线程复用过程

理解线程复用原理首先应了解线程生命周期。

Java 线程池详解及实例代码

在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。

Thread通过new来新建一个线程,这个过程是是初始化一些线程信息,如线程名,id,线程所属group等,可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器,同时将hasBeenStarted为true,之后调用start方法就会有异常。

处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于JVM里线程调度器的调度。当线程获取cpu后,run()方法会被调用。不要自己去调用Thread的run()方法。之后根据CPU的调度在就绪——运行——阻塞间切换,直到run()方法结束或其他方式停止线程,进入dead状态。

所以实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)。接下来来看下ThreadPoolExecutor是怎么实现线程复用的。

在ThreadPoolExecutor主要Worker类来控制线程的复用。看下Worker类简化后的代码,这样方便理解:

private final class Worker implements Runnable {
final Thread thread;
 
Runnable firstTask;
 
Worker(Runnable firstTask) {
 
this.firstTask = firstTask;
 
this.thread = getThreadFactory().newThread(this);
 
}
 
public void run() {
 
runWorker(this);
 
}
 
final void runWorker(Worker w) {
 
Runnable task = w.firstTask;
 
w.firstTask = null;
 
while (task != null || (task = getTask()) != null){
 
task.run();
 
}
 
}

   

Worker是一个Runnable,同时拥有一个thread,这个thread就是要开启的线程,在新建Worker对象时同时新建一个Thread对象,同时将Worker自己作为参数传入TThread,这样当Thread的start()方法调用时,运行的实际上是Worker的run()方法,接着到runWorker()中,有个while循环,一直从getTask()里得到Runnable对象,顺序执行。getTask()又是怎么得到Runnable对象的呢?

依旧是简化后的代码:

private Runnable getTask() {
 if(一些特殊情况) {
  return null;
 }
Runnable r = workQueue.take();
 
return r;
 
}

   

这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。

2.控制最大并发数

那Runnable是什么时候放入workQueue?Worker又是什么时候创建,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?

很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。

execute:

简化后的代码

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();
 
// 当前线程数 < corePoolSize
 
if (workerCountOf(c) < corePoolSize) {
 
// 直接启动新的线程。
 
if (addWorker(command, true))
 
return;
 
c = ctl.get();
 
}
 
// 活动线程数 >= corePoolSize
 
// runState为RUNNING && 队列未满
 
if (isRunning(c) && workQueue.offer(command)) {
 
int recheck = ctl.get();
 
// 再次检验是否为RUNNING状态
 
// 非RUNNING状态 则从workQueue中移除任务并拒绝
 
if (!isRunning(recheck) && remove(command))
 
reject(command);// 采用线程池指定的策略拒绝任务
 
// 两种情况:
 
// 1.非RUNNING状态拒绝新的任务
 
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
 
} else if (!addWorker(command, false))
 
reject(command);
 
}

   

addWorker:

简化后的代码

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
 
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
 
return false;
 
}
 
w = new Worker(firstTask);
 
final Thread t = w.thread;
 
t.start();
 
}

   

根据代码再来看上面提到的线程池工作过程中的添加任务的情况:

* 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;   
* 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
* 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
* 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。

这就是Android的AsyncTask在并行执行是在超出最大任务数是抛出RejectExecutionException的原因所在,详见基于最新版本的AsyncTask源码解读及AsyncTask的黑暗面

通过addWorker如果成功创建新的线程成功,则通过start()开启新线程,同时将firstTask作为这个Worker里的run()中执行的第一个任务。

虽然每个Worker的任务是串行处理,但如果创建了多个Worker,因为共用一个workQueue,所以就会并行处理了。

所以根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可用下图表示。

Java 线程池详解及实例代码

上面的讲解和图来可以很好的理解的这个过程。

如果是做Android开发的,并且对Handler原理比较熟悉,你可能会觉得这个图挺熟悉,其中的一些过程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相当于execute(Runnuble),Looper中维护的Meaasge队列相当于BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。

3.管理线程

通过线程池可以很好的管理线程的复用,控制并发数,以及销毁等过程,线程的复用和控制并发上面已经讲了,而线程的管理过程已经穿插在其中了,也很好理解。

在ThreadPoolExecutor有个ctl的AtomicInteger变量。通过这一个变量保存了两个内容:

所有线程的数量 每个线程所处的状态 其中低29位存线程数,高3位存runState,通过位运算来得到不同的值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到线程的状态
 
private static int runStateOf(int c) {
 
return c & ~CAPACITY;
 
}
 
//得到Worker的的数量
 
private static int workerCountOf(int c) {
 
return c & CAPACITY;
 
}
 
// 判断线程是否在运行
 
private static boolean isRunning(int c) {
 
return c < SHUTDOWN;
 
}

   

這裡主要透過shutdown和shutdownNow()來分析執行緒池的關閉過程。首先線程池有五種狀態來控制任務添加與執行。主要介紹以下三種:

RUNNING狀態:線程池正常運行,可以接受新的任務並處理隊列中的任務;

SHUTDOWN狀態:不再接受新的任務,但是會執行隊列中的任務;

STOP狀態:不再接受新任務,不處理佇列中的任務shutdown這個方法會將runState置為SHUTDOWN,會終止所有空閒的線程,而仍在工作的線程不受影響,所以隊列中的任務人會被執行。

shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的線程,所以佇列中的任務也不會被執行了。

總結
透過對ThreadPoolExecutor原始碼的分析,從總體上了解了線程池的創建,任務的添加,執行等過程,熟悉這些過程,使用線程池就會更輕鬆了。

而從中學到的一些對同時控制,以及生產者——消費者模型任務處理的使用,對以後理解或解決其他相關問題會有很大的幫助。例如Android中的Handler機制,而Looper中的Messager佇列用一個BlookQueue來處理同樣是可以的,這寫就是讀源碼的收穫吧。

以上就是對Java 執行緒池的資料整理,後續繼續補充相關資料,謝謝大家對本站的支持!

更多Java 執行緒池詳解及實例程式碼相關文章請關注PHP中文網!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn