這篇文章主要介紹了詳解Java線程池和Executor原理的分析的相關資料,這裡提供實例及分析原理幫助大家理解這部分知識,需要的朋友可以參考下
詳解Java線程池和Executor原理的分析
線程池作用與基本知識
在開始之前,我們先來討論下「線程池”這個概念。 “線程池”,顧名思義就是一個線程快取。它是一個或多個執行緒的集合,使用者可以把需要執行的任務簡單地丟給執行緒池,而不用過多的糾結與執行的細節。那麼線程池有哪些作用呢?或者說與直接用Thread相比,有什麼優勢?我簡單總結了以下幾點:
減少線程創建和銷毀帶來的消耗
對於Java Thread的實現,我在前面的一篇blog中進行了分析。 Java Thread與核心執行緒是1:1(Linux)的,再加上Thread在Java層與C++層都有不少成員數據,所以Java Thread其實是比較重的。創建和銷毀一個Java Thread需要OS和JVM都做不少工作,因此如果將Java Thread緩存起來,可以實現一定的效率提升。
更方便且透明的實作運算資源控制
討論這一條,可能需要舉一些例子。以非常聞名的web伺服器Nginx為例,Nginx以強大的並發能力和低資源消耗而聞名。 Nginx為了實現這些嚴格的要求,它嚴格地限定了工作執行緒的數目(worker執行緒一般等於CPU數目)。這種設計的重點就是降低執行緒切換帶來的效能損失,而這條優化方式對Java同樣適用。倘若,每來一個任務就新建一個Thread來運算,那最終的結果就是程式資源難以控制(某個功能把CPU跑滿了),而且整體的執行速度也比較慢。 而Java執行緒池提供了FixedThreadPool,你可以用它來實現執行緒最大數目的控制。
上面說了這麼多的“廢話”,還是來結合Java線程池的實作來分析一下吧! Java的線程池有幾個實作:
cached ThreadPool
#快取執行緒池的特點是它會快取之前的線程,新提交的任務可以運行在快取的執行緒中,即實現了前文所述的第一個優勢。
fixed ThreadPool
cachedThreadPool的一個特點是-新提交的任務沒有空閒執行緒可以執行了,就會建立一個新的執行緒。而fixedThreadPool不會這樣,它會將任務儲存起來,等到有空閒執行緒再執行。即實現了前文所述的第二個優勢。
scheduled ThreadPool
scheduled ThreadPool的特點是可以實現任務的調度,例如任務的延遲執行和週期執行。
出了上面三種,Java也實作了newWorkStealingPool,這個是基於Fork/Join框架的。目前我還沒研究這個,所以就先不管它了。 Java的同時支援中,使用了Executor來包裝各種線程池,「執行器」這個名稱其實挺貼切的,線程池可不就是個執行器嘛!
1.cached ThreadPool、fixed ThreadPool的實作
從前文的描述就可以看出,這兩個執行緒池非常類似。的確是這樣,事實上它們是同時實現的,不行我們來看實際範例:
ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();
ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);
這是兩個線程池的新建方法,看起來很像!如果你不這麼認為,我只能讓你看看真相了。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
是的,它們呼叫了同一個建構函數,只是參數略有不同。那我們來看看這些參數的意義,以及兩組參數的差異。首先還是要貼一下ThreadPoolExecutor的建構子了。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
為了看起來清爽,再一層的建構子我就不貼了,而且那個建構子也只是簡單的賦值而已。這裡的函數原型已經能給我們很多很多資訊了,不得不說JDK的程式碼命名確實好,簡直就像註解一樣。
maximumPoolSize就是線程池的最大線程數;對於cached ThreadPool來說,這個值是Integer.MAX_VALUE,基本上相當於無窮大了,什麼樣的機器可以跑幾十億線程! !對於fixed ThreadPool來講,這個值就是使用者設定的線程池的數目。
keepAliveTime和unit決定了線程的快取過期時間;對於cached ThreadPool來講,線程的快取過期時間是一分鐘,換言之,一個工作線程如果一分鐘都無事可做,就把它撤銷掉以節省資源。 fixed ThreadPool傳入的時間是0,這裡的意思是fixed ThreadPool中的工作執行緒是永遠不會過期的。
corePoolSize是线程池的最小线程数;对于cached ThreadPool,这个值为0,因为在完全没有任务的情况下,cached ThreadPool的确会成为“光杆司令”。至于fixed ThreadPool,这个fixed已经表明corePoolSize是等于线程总数的。
接下来,我们根据一个简单的使用例子,来看看一下cached ThreadPool的流程。
public class Task implements Callable<String> { private String name; public Task(String name) { this.name = name; } @Override public String call() throws Exception { System.out.printf("%s: Starting at : %s\n", this.name, new Date()); return "hello, world"; } public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool(); Task task = new Task("test"); Future<String> result = executor.submit(task); try { System.out.printf("%s\n", result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } executor.shutdown(); System.out.printf("Main ends at : %s\n", new Date()); } }
首先,来看看executor.submit(task),这其实调用了ThreadPoolExecutor.execute(Runnable command)方法,这个方法的代码如下,整段代码的逻辑是这样的。首先检查线程池的线程数是否不够corePoolSize,如果不够就直接新建线程并把command添加进去;如果线程数已经够了或者添加失败(多个线程增加添加的情况),就尝试把command添加到队列中(workQueue.offer(command)),如果添加失败了,就reject掉cmd。大体的逻辑是这样的,这段代码有很多基于线程安全的设计,这里为了不跑题,就先忽略细节了。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
到这里,看起来线程池实现的整体思路其实也没多么复杂。但是还有一个问题——一个普通的Thread在执行完自己的run方法后会自动退出。那么线程池是如何实现Worker线程不断的干活,甚至在没有任务的时候。其实答案很简单,就是Worker其实在跑大循环,Worker实际运行方法如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); /***/ try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /***/ } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
关键就在这个while的判断条件,对于需要cached线程的情况下,getTask()会阻塞起来,如果缓存的时间过期,就会返回一个null,然后Worker就退出了,也就结束了它的服役周期。而在有任务的情况下,Woker会把task拿出来,然后调用task.run()执行任务,并通过Future通知客户线程(即future.get()返回)。这样一个简单的线程池使用过程就完了。。。
当然,线程池的很多精髓知识——基于线程安全的设计,我都没有分析。有兴趣可以自己分析一下,也可以和我讨论。此外Scheduled ThreadPool这里也没有分析,它的要点其实是调度,主要是根据时间最小堆来驱动的。
以上是關於Java中線程池與Executor原理詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!