首頁  >  文章  >  Java  >  關於Java中線程池與Executor原理詳解

關於Java中線程池與Executor原理詳解

黄舟
黄舟原創
2017-07-30 10:43:001277瀏覽

這篇文章主要介紹了詳解Java線程池和Executor原理的分析的相關資料,這裡提供實例及分析原理幫助大家理解這部分知識,需要的朋友可以參考下

詳解Java線程池和Executor原理的分析

線程池作用與基本知識

在開始之前,我們先來討論下「線程池”這個概念。 “線程池”,顧名思義就是一個線程快取。它是一個或多個執行緒的集合,使用者可以把需要執行的任務簡單地丟給執行緒池,而不用過多的糾結與執行的細節。那麼線程池有哪些作用呢?或者說與直接用Thread相比,有什麼優勢?我簡單總結了以下幾點:

減少線程創建和銷毀帶來的消耗

對於Java Thread的實現,我在前面的一篇blog中進行了分析。 Java Thread與核心執行緒是1:1(Linux)的,再加上Thread在Java層與C++層都有不少成員數據,所以Java Thread其實是比較重的。創建和銷毀一個Java Thread需要OS和JVM都做不少工作,因此如果將Java Thread緩存起來,可以實現一定的效率提升。

更方便且透明的實作運算資源控制

討論這一條,可能需要舉一些例子。以非常聞名的web伺服器Nginx為例,Nginx以強大的並發能力和低資源消耗而聞名。 Nginx為了實現這些嚴格的要求,它嚴格地限定了工作執行緒的數目(worker執行緒一般等於CPU數目)。這種設計的重點就是降低執行緒切換帶來的效能損失,而這條優化方式對Java同樣適用。倘若,每來一個任務就新建一個Thread來運算,那最終的結果就是程式資源難以控制(某個功能把CPU跑滿了),而且整體的執行速度也比較慢。 而Java執行緒池提供了FixedThreadPool,你可以用它來實現執行緒最大數目的控制。

上面說了這麼多的“廢話”,還是來結合Java線程池的實作來分析一下吧! Java的線程池有幾個實作:

cached ThreadPool

#快取執行緒池的特點是它會快取之前的線程,新提交的任務可以運行在快取的執行緒中,即實現了前文所述的第一個優勢。

fixed ThreadPool

cachedThreadPool的一個特點是-新提交的任務沒有空閒執行緒可以執行了,就會建立一個新的執行緒。而fixedThreadPool不會這樣,它會將任務儲存起來,等到有空閒執行緒再執行。即實現了前文所述的第二個優勢。

scheduled ThreadPool

scheduled ThreadPool的特點是可以實現任務的調度,例如任務的延遲執行和週期執行。

出了上面三種,Java也實作了newWorkStealingPool,這個是基於Fo​​rk/Join框架的。目前我還沒研究這個,所以就先不管它了。 Java的同時支援中,使用了Executor來包裝各種線程池,「執行器」這個名稱其實挺貼切的,線程池可不就是個執行器嘛!

1.cached ThreadPool、fixed ThreadPool的實作

從前文的描述就可以看出,這兩個執行緒池非常類似。的確是這樣,事實上它們是同時實現的,不行我們來看實際範例:


ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();


ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);

這是兩個線程池的新建方法,看起來很像!如果你不這麼認為,我只能讓你看看真相了。


public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>());
}

是的,它們呼叫了同一個建構函數,只是參數略有不同。那我們來看看這些參數的意義,以及兩組參數的差異。首先還是要貼一下ThreadPoolExecutor的建構子了。


public ThreadPoolExecutor(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), defaultHandler);
}

為了看起來清爽,再一層的建構子我就不貼了,而且那個建構子也只是簡單的賦值而已。這裡的函數原型已經能給我們很多很多資訊了,不得不說JDK的程式碼命名確實好,簡直就像註解一樣。

maximumPoolSize就是線程池的最大線程數;對於cached ThreadPool來說,這個值是Integer.MAX_VALUE,基本上相當於無窮大了,什麼樣的機器可以跑幾十億線程! !對於fixed ThreadPool來講,這個值就是使用者設定的線程池的數目。
keepAliveTime和unit決定了線程的快取過期時間;對於cached ThreadPool來講,線程的快取過期時間是一分鐘,換言之,一個工作線程如果一分鐘都無事可做,就把它撤銷掉以節省資源。 fixed ThreadPool傳入的時間是0,這裡的意思是fixed ThreadPool中的工作執行緒是永遠不會過期的。

corePoolSize是线程池的最小线程数;对于cached ThreadPool,这个值为0,因为在完全没有任务的情况下,cached ThreadPool的确会成为“光杆司令”。至于fixed ThreadPool,这个fixed已经表明corePoolSize是等于线程总数的。
接下来,我们根据一个简单的使用例子,来看看一下cached ThreadPool的流程。


public class Task implements Callable<String> {

private String name;
public Task(String name) {
  this.name = name;
}
@Override
public String call() throws Exception {
  System.out.printf("%s: Starting at : %s\n", this.name, new Date());
  return "hello, world";
}
public static void main(String[] args) {
  ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
  Task task = new Task("test");
  Future<String> result = executor.submit(task);
  try {
    System.out.printf("%s\n", result.get());
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
  executor.shutdown();
  System.out.printf("Main ends at : %s\n", new Date());
}
}

首先,来看看executor.submit(task),这其实调用了ThreadPoolExecutor.execute(Runnable command)方法,这个方法的代码如下,整段代码的逻辑是这样的。首先检查线程池的线程数是否不够corePoolSize,如果不够就直接新建线程并把command添加进去;如果线程数已经够了或者添加失败(多个线程增加添加的情况),就尝试把command添加到队列中(workQueue.offer(command)),如果添加失败了,就reject掉cmd。大体的逻辑是这样的,这段代码有很多基于线程安全的设计,这里为了不跑题,就先忽略细节了。


public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  else if (!addWorker(command, false))
    reject(command);
}

到这里,看起来线程池实现的整体思路其实也没多么复杂。但是还有一个问题——一个普通的Thread在执行完自己的run方法后会自动退出。那么线程池是如何实现Worker线程不断的干活,甚至在没有任务的时候。其实答案很简单,就是Worker其实在跑大循环,Worker实际运行方法如下:


final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {
      w.lock();
  /***/
      try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        /***/
        } finally {
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

关键就在这个while的判断条件,对于需要cached线程的情况下,getTask()会阻塞起来,如果缓存的时间过期,就会返回一个null,然后Worker就退出了,也就结束了它的服役周期。而在有任务的情况下,Woker会把task拿出来,然后调用task.run()执行任务,并通过Future通知客户线程(即future.get()返回)。这样一个简单的线程池使用过程就完了。。。

当然,线程池的很多精髓知识——基于线程安全的设计,我都没有分析。有兴趣可以自己分析一下,也可以和我讨论。此外Scheduled ThreadPool这里也没有分析,它的要点其实是调度,主要是根据时间最小堆来驱动的。

以上是關於Java中線程池與Executor原理詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn