首頁 >Java >java教程 >ThreadPoolExecutor執行緒池原理及其execute方法

ThreadPoolExecutor執行緒池原理及其execute方法

巴扎黑
巴扎黑原創
2017-06-26 11:27:471608瀏覽

jdk1.7.0_79 

  對於執行緒池大部分人可能會用,也知道為什麼用。無非就是任務需要非同步執行,再者就是執行緒需要統一管理起來。對於從線程池中獲取線程,大部分人可能只知道,我現在需要一個線程來執行一個任務,那我就把任務丟到線程池裡,線程池裡有空閒的線程就執行,沒有空閒的線程就等待。實際上對於線程池的執行原理遠不止這麼簡單。

  在Java並發套件中提供了線程池類別——ThreadPoolExecutor,實際上更多的我們可能用到的是Executors工廠類別為我們提供的執行緒池newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,這三個執行緒池並不是ThreadPoolExecutor 的子類,關於這幾者之間的關係,我們先來查看ThreadPoolExecutor,查看源碼發現其一共有4 個建構方法。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

  首先就從這幾個參數開始來了解執行緒池ThreadPoolExecutor的執行原理

  corePoolSize:核心執行緒池的執行緒數量

  maximumPoolSize:最大的執行緒池執行緒數量

  keepAliveTime:執行緒活動保持時間,在執行緒池的工作執行緒空閒後,保持在執行緒後,保持在執行緒狀態存活的時間。

  unit:執行緒活動保持時間的單位。

  workQueue:指定任務佇列所使用的阻塞佇列

  corePoolSizemaximumPoolSize  corePoolSizemaximumPoolSize都在指定執行緒池中的線程數量,好像平常用到線程池的時候最多就只需要傳遞一個線程池大小的參數就能創建一個線程池啊,Java為我們提供了一些常用的線程池類就是上面提到的new

FixedThreadPool、

newSingleThreadExecutor、newCachedThreadPool,當然如果我們想要自己發揮創建自訂的執行緒池就得自己來「配置」有關線程池的一些參數。

  當把一個任務交給執行緒池來處理的時候,執行緒池的執行原理如下圖所示參考自《

Java並發程式設計的藝術》

  ①

#首先會判斷核心執行緒池裡是否有執行緒可執行

,有空閒執行緒則建立一個執行緒來執行任務。

  ②當核心執行緒池裡已經沒有執行緒可執行的時候,此時將任務丟到任務佇列中去。

  ③如果任務佇列(有界)也已經滿了的話,但運行的執行緒數小於最大執行緒池的數量的時候,此時將會新建一個執行緒用於執行任務,但如果執行的執行緒數已經達到最大執行緒池的數量的時候,此時將無法建立執行緒執行任務。

  所以實際上對於線程池不僅是單純地將任務丟到線程池,線程池中有線程就執行任務,沒有線程就等待。

  為鞏固一下線程池的原理,現在再來了解上面提到的常用的3個線程池:

 
 Executors.newFixedThreadPool
: ###建立一個固定數量執行緒的執行緒池。 #########
// Executors#newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
####

  可以看到newFixedThreadPool中调用的是ThreadPoolExecutor,传递的参数corePoolSize= maximumPoolSize=nThread。回顾线程池的执行原理,当一个任务提交到线程池中,首先判断核心线程池里有没有空闲线程,有则创建线程,没有则将任务放到任务队列(这里是有界阻塞队列LinkedBlockingQueue中,如果任务队列已经满了的话,对于newFixedThreadPool来说,它的最大线程池数量=核心线程池数量,此时任务队列也满了,将不能扩展创建新的线程来执行任务。

  Executors.newSingleThreadExecutor:创建只包含一个线程的线程池。  

//Executors# newSingleThreadExecutorpublic static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

  只有一个线程的线程池好像有点奇怪,并且并没有直接将返回ThreadPoolExecutor,甚至也没有直接将线程池数量1传递给newFixedThreadPool返回。那就说明这个只含有一个线程的线程池,或许并没有只包含一个线程那么简单。在其源码注释中这么写到:创建只有一个工作线程的线程池用于操作一个无界队列(如果由于前驱节点的执行被终止结束了,一个新的线程将会继续执行后继节点线程)任务得以继续执行,不同于newFixedThreadPool(1)不会有额外的线程来重新继续执行后继节点。也就是说newSingleThreadExecutor自始至终都只有一个线程在执行,这和newFixedThreadPool一样,但如果线程终止结束过后newSingleThreadExecutor则会重新创建一个新的线程来继续执行任务队列中的线程,而newFixedThreaPool则不会。

  Executors.newCachedThreadPool:根据需要创建新线程的线程池。

//Executors#newCachedThreadPoolpublic static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

  可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核心线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注意SynchronousQueue是一个没有容量的队列,也就是说每个入队操作必须等待另一个线程的对应出队操作,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是一件好事,严重会耗尽CPU和内存资源。

  题外话:newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool,这三者都直接或间接调用了ThreadPoolExecutor,为什么它们三者没有直接是其子类,而是通过Executors来实例化呢?这是所采用的静态工厂方法java.util.Connections接口中同样也是采用的静态工厂方法来创建相关的类这样有很多好处,静态工厂方法是用来产生对象的,产生什么对象没关系,只要返回原返回类型或原返回类型的子类型都可以降低API数目和使用难度,Effective Java》中的第1条就是静态工厂方法。


  回到ThreadPoolExecutor,首先来看它的继承关系:

  ThreadPoolExecutor它的顶级父类是Executor接口,只包含了一个方法——execute,这个方法也就是线程池的“执行”。

//Executor#executepublic interface Executor {void execute(Runnable command);
}

  Executor#execute的实现则是在ThreadPoolExecutor中实现的

//ThreadPoolExecutor#executepublic void execute(Runnable command) {  if (command == null) throw new NullPointerException();
  int c = ctl.get();    
    …
}

  一来就碰到个不知所云的ctl变量它的定义

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

  这个变量使用来干嘛的呢?它的作用有点类似我们在《7.ReadWriteLock接口及其实现ReentrantReadWriteLock》中提到的读写锁有读、写两个同步状态,而AQS则只提供了state一个int型变量,此时将state16位表示为读状态,低16位表示为写状态。这里的clt同样也是,它表示了两个概念:

  1. workerCount:当前有效的线程数

  2. runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。

  int型变量一共有32,线程池的五种状态runState至少需要3位来表示,故workCount只能有29位,所以代码中规定线程池的有效线程数最多为229-1。

//ThreadPoolExecutorprivate static final int COUNT_BITS = Integer.SIZE – 3;     //32-3=29,线程数量所占位数private static final int CAPACITY = (1 << COUNT_BITS) – 1;    //低29位表示最大线程数,229-1//五种线程池状态private static final int RUNNING = -1 << COUNT_BITS;    /int型变量高3位(含符号位)101表RUNINGprivate static final int SHUTDOWN = 0 << COUNT_BITS;    //高3位000private static final int STOP = 1 << COUNT_BITS;    //高3位001private static final int TIDYING = 2 << COUNT_BITS;    //高3位010private static final int TERMINATED = 3 << COUNT_BITS;    //高3位011

  再次回到ThreadPoolExecutor#execute方法:

 1 //ThreadPoolExecutor#execute 2 public void execute(Runnable command) { 3     if (command == null) 
 4         throw new NullPointerException(); 5    int c = ctl.get();    //由它可以获取到当前有效的线程数和线程池的状态 6 /*1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中*/ 7     if (workerCountOf(c) < corePoolSize){ 8         if (addWorker(command, tre))     //在addWorker中创建工作线程执行任务 9             return ;10         c = ctl.get();11     }12 /*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/13     if (isRunning(c) && workQueue.offer(command))    {    //线程池是否处于运行状态,且是否任务插入任务队列成功14         int recheck = ctl.get();15      if (!isRunning(recheck) && remove(command))        //线程池是否处于运行状态,如果不是则使刚刚的任务出队16        reject(command);    //抛出RejectedExceptionException异常17      else if (workerCountOf(recheck) == 0)18        addWorker(null, false);19   }20 /*3.插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常*/21   else if (!addWorker(command, false)){22     reject(command);    //抛出RejectedExceptionException异常23   }24 }

  上面代码注释第7行的即判断当前核心线程池里是否有空闲线程,有则通过addWorker方法创建工作线程执行任务。addWorker方法较长,筛选出重要的代码来解析。 

 1 //ThreadPoolExecutor#addWorker 2 private boolean addWorker(Runnable firstTask, boolean core) { 3 /*首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环*/ 4   ... 5   if (compareAndIncrementWorkerCount(c)) 6     break retry; 7   ... 8 /*正在运行的线程数自增成功后则将线程封装成工作线程Worker*/ 9   boolean workerStarted = false;10   boolean workerAdded = false;11   Worker w = null;12   try {13     final ReentrantLock mainLock = this.mainLock;        //全局锁14     w = new Woker(firstTask);        //将线程封装为Worker工作线程15     final Thread t = w.thread;16     if (t != null) {17       mainLock.lock();    //获取全局锁18 /*当持有了全局锁的时候,还需要再次检查线程池的运行状态等*/19       try {20         int c = clt.get();21         int rs = runStateOf(c);        //线程池运行状态22         if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){        //线程池处于运行状态,或者线程池关闭且任务线程为空23           if (t.isAlive())    //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的24             throw new IllegalThreadStateException();25           workers.add(w);    //private final HashSet<Worker> wokers = new HashSet<Worker>();包含线程池中所有的工作线程,只有在获取了全局的时候才能访问它。将新构造的工作线程加入到工作线程集合中26           int s = worker.size();    //工作线程数量27           if (s > largestPoolSize)28             largestPoolSize = s;29           workerAdded = true;    //新构造的工作线程加入成功30         }31       } finally {32         mainLock.unlock();33       }34       if (workerAdded) {35         t.start();    //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法36         workerStarted = true;37       }38     }39   } finally {40     if (!workerStarted)    //未能成功创建执行工作线程41       addWorkerFailed(w);    //在启动工作线程失败后,将工作线程从集合中移除42   }43   return workerStarted;44 }

  在上面第35代码中,工作线程被成功添加到工作线程集合中后,则开始start执行,这里start执行的是Worker工作线程中的run方法。

//ThreadPoolExecutor$Worker,它继承了AQS,同时实现了Runnable,所以它具备了这两者的所有特性private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1);    //设置AQS的同步状态为-1,禁止中断,直到调用runWorker    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);    //通过线程工厂来创建一个线程,将自身作为Runnable传递传递  }
  public void run() {
    runWorker(this);    //运行工作线程  }
}

  ThreadPoolExecutor#runWorker,在此方法中,Worker在执行完任务后,还会循环获取任务队列里的任务执行(其中的getTask方法),也就是说Worker不仅仅是在执行完给它的任务就释放或者结束,它不会闲着,而是继续从任务队列中获取任务,直到任务队列中没有任务可执行时,它才退出循环完成任务。理解了以上的源码过后,往后线程池执行原理的第二步、第三步的理解实则水到渠成。

以上是ThreadPoolExecutor執行緒池原理及其execute方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn