ホームページ  >  記事  >  Java  >  JAVAのThreadPoolExecutorスレッドプール原理とその実行メソッドの例の詳細な説明

JAVAのThreadPoolExecutorスレッドプール原理とその実行メソッドの例の詳細な説明

怪我咯
怪我咯オリジナル
2017-06-30 10:38:362573ブラウズ

次のエディターは、ThreadPoolExecutor スレッド プールの原理とその実行メソッド (詳細な説明) に関する記事を提供します。編集者はこれがとても良いものだと思ったので、皆さんの参考として今から共有します。編集者をフォローして、一緒に見てみましょう

jdk1.7.0_79

ほとんどの人はスレッド プールを使用している可能性があり、スレッド プールを使用する理由も知っています。タスクは非同期で実行する必要があり、スレッドは均一に管理する必要があるだけです。スレッド プールからスレッドを取得することについては、タスクを実行するためにスレッドが必要な場合、スレッド プールにアイドル状態のスレッドがあれば、そのスレッドが実行されることだけを知っている人が多いかもしれません。アイドル状態のスレッドがない場合は、そのまま実行されます。実際、スレッド プールの実行原理はそれほど単純ではありません。

スレッド プール クラス - ThreadPoolExecutor は Java 同時実行パッケージで提供されています。実際、多くの人は Executors ファクトリ クラスによって提供されるスレッド プール (newFixedThreadPool、newSingleThreadPool、newCachedThreadPool) を使用する可能性があります。これら 3 つのスレッド プールは ThreadPoolExecutor のサブクラスではありません。これらの関係については、まず ThreadPoolExecutor を見てみましょう。ソースコードを見ると、合計 4 つの構築メソッドがあることがわかります。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

まず、スレッド プール ThreadPoolExecutor の実行原理を理解するために、これらのパラメーターから始めましょう。

corePoolSize: コアスレッドプール内のスレッド数

maximumPoolSize: スレッドプールスレッドの最大数

keepAliveTime: スレッドアクティビティの保持時間、スレッド プールの作業時間 スレッドがアイドル状態になった後も存続する時間。

unit: スレッドアクティビティの保持時間の単位。

workQueue: タスクキューで使用するブロッキングキューを指定します

corePoolSizeとmaximumPoolSizeは両方ともスレッドプール内のスレッド数を指定するだけでよいようです。サイズ パラメータを使用してスレッド プールを作成できます。Java には、カスタム スレッドを作成する場合に、一般的に使用されるスレッド プール クラス (前述の newFixedThreadPool、newSingleThreadExecutor、および newCachedThreadPool) が用意されています。プールを自分で行うには、スレッド プールに関するいくつかのパラメータを自分で「設定」する必要があります。

タスクがスレッドプールに渡されて処理されるとき、スレッドプールの実行原理は下図の通りです。「The Art of Java Concurrent Programming」を参照してください

① まず、それを決定します。コア スレッド プールにスレッドがあるかどうかを確認し、アイドル状態のスレッドがある場合は、タスクを実行するスレッドを作成します。

②コアスレッドプールに実行可能なスレッドがない場合、タスクはタスクキューに投入されます。

③ タスクキュー(制限付き)も満杯だが、実行中のスレッド数がスレッドプールの最大数未満の場合、タスクを実行するために新しいスレッドが作成されますが、実行中のスレッド数が上限に達している場合は、スレッド プールの最大数に達すると、現時点ではタスクを実行するためのスレッドは作成されません。

つまり、実際には、スレッド プールは、スレッド プールにタスクを投げるだけではありません。スレッド プールにスレッドがあれば、タスクは実行され、スレッドがなければ、待機します。

スレッド プールの原則を統合するために、上記で一般的に使用される 3 つのスレッド プールについて学びましょう:

Executors.newFixedThreadPool: 固定数のスレッドでスレッド プールを作成します。

// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool で ThreadPoolExecutor クラスが呼び出され、パラメーター corePoolSize= minimumPoolSize=nThread が渡されたことがわかります。スレッド プールの実行原理を振り返ると、タスクがスレッド プールに送信されると、まずコア スレッド プールにアイドル状態のスレッドがあるかどうかが判断され、アイドル状態のスレッドが存在する場合はスレッドが作成されます。 、タスクはタスク キュー (ここでは境界付きブロッキング キュー LinkedBlockingQueue) に配置されます。タスク キューがいっぱいの場合、newFixedThreadPool の場合、その最大スレッド プール数 = コア スレッド プールの数になります。キューもいっぱいで、タスクを実行するために新しいスレッドを拡張できません。

Executors.newSingleThreadExecutor: スレッドを 1 つだけ含むスレッド プールを作成します。

//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

只有一个线程的线程池好像有点奇怪,并且并没有直接将返回ThreadPoolExecutor,甚至也没有直接将线程池数量1传递给newFixedThreadPool返回。那就说明这个只含有一个线程的线程池,或许并没有只包含一个线程那么简单。在其源码注释中这么写到:创建只有一个工作线程的线程池用于操作一个无界队列(如果由于前驱节点的执行被终止结束了,一个新的线程将会继续执行后继节点线程)任务得以继续执行,不同于newFixedThreadPool(1)不会有额外的线程来重新继续执行后继节点。也就是说newSingleThreadExecutor自始至终都只有一个线程在执行,这和newFixedThreadPool一样,但如果线程终止结束过后newSingleThreadExecutor则会重新创建一个新的线程来继续执行任务队列中的线程,而newFixedThreaPool则不会。

Executors.newCachedThreadPool:根据需要创建新线程的线程池。

//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核心线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注意SynchronousQueue是一个没有容量的队列,也就是说每个入队操作必须等待另一个线程的对应出队操作,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是一件好事,严重会耗尽CPU和内存资源。

题外话:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,这三者都直接或间接调用了ThreadPoolExecutor,为什么它们三者没有直接是其子类,而是通过Executors来实例化呢?这是所采用的静态工厂方法,在java.util.Connections接口中同样也是采用的静态工厂方法来创建相关的类。这样有很多好处,静态工厂方法是用来产生对象的,产生什么对象没关系,只要返回原返回类型或原返回类型的子类型都可以,降低API数目和使用难度,在《Effective Java》中的第1条就是静态工厂方法。

回到ThreadPoolExecutor,首先来看它的继承关系:

ThreadPoolExecutor它的顶级父类是Executor接口,只包含了一个方法——execute,这个方法也就是线程池的“执行”。

//Executor#execute
public interface Executor {
 void execute(Runnable command);
}

Executor#execute的实现则是在ThreadPoolExecutor中实现的:

//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null) 
  throw new NullPointerException();
  int c = ctl.get(); 
 …
}

一来就碰到个不知所云的ctl变量它的定义:

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

这个变量使用来干嘛的呢?它的作用有点类似我们在《ReadWriteLock接口及其实现ReentrantReadWriteLock》中提到的读写锁有读、写两个同步状态,而AQS则只提供了state一个int型变量,此时将state高16位表示为读状态,低16位表示为写状态。这里的clt同样也是,它表示了两个概念:

workerCount:当前有效的线程数

runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。

int型变量一共有32位,线程池的五种状态runState至少需要3位来表示,故workCount只能有29位,所以代码中规定线程池的有效线程数最多为229-1。

//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3;  //32-3=29,线程数量所占位数
private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大线程数,229-1
//五种线程池状态
private static final int RUNNING = -1 << COUNT_BITS; /int型变量高3位(含符号位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000
private static final int STOP = 1 << COUNT_BITS; //高3位001
private static final int TIDYING = 2 << COUNT_BITS; //高3位010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位011

再次回到ThreadPoolExecutor#execute方法:

//ThreadPoolExecutor#execute
public void execute(Runnable command) {
 if (command == null) 
  throw new NullPointerException();
   int c = ctl.get(); //由它可以获取到当前有效的线程数和线程池的状态
/*1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中*/
 if (workerCountOf(c) < corePoolSize){
  if (addWorker(command, tre))  //在addWorker中创建工作线程执行任务
   return ;
  c = ctl.get();
 }
/*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/
 if (isRunning(c) && workQueue.offer(command)) { //线程池是否处于运行状态,且是否任务插入任务队列成功
  int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))  //线程池是否处于运行状态,如果不是则使刚刚的任务出队
       reject(command); //抛出RejectedExceptionException异常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
/*3.插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常*/
  else if (!addWorker(command, false)){
    reject(command); //抛出RejectedExceptionException异常
  }
}

上面代码注释第7行的即判断当前核心线程池里是否有空闲线程,有则通过addWorker方法创建工作线程执行任务。addWorker方法较长,筛选出重要的代码来解析。

//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
/*正在运行的线程数自增成功后则将线程封装成工作线程Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;  //全局锁
    w = new Woker(firstTask);  //将线程封装为Worker工作线程
    final Thread t = w.thread;
    if (t != null) {
      mainLock.lock(); //获取全局锁
/*当持有了全局锁的时候,还需要再次检查线程池的运行状态等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);  //线程池运行状态
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){  //线程池处于运行状态,或者线程池关闭且任务线程为空
          if (t.isAlive()) //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
            throw new IllegalThreadStateException();
          workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含线程池中所有的工作线程,只有在获取了全局的时候才能访问它。将新构造的工作线程加入到工作线程集合中
          int s = worker.size(); //工作线程数量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true; //新构造的工作线程加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start(); //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted) //未能成功创建执行工作线程
      addWorkerFailed(w); //在启动工作线程失败后,将工作线程从集合中移除
  }
  return workerStarted;
}

在上面第35代码中,工作线程被成功添加到工作线程集合中后,则开始start执行,这里start执行的是Worker工作线程中的run方法。

//ThreadPoolExecutor$Worker,它继承了AQS,同时实现了Runnable,所以它具备了这两者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1); //设置AQS的同步状态为-1,禁止中断,直到调用runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //通过线程工厂来创建一个线程,将自身作为Runnable传递传递
  }
  public void run() {
    runWorker(this); //运行工作线程
  }
}

ThreadPoolExecutor#runWorker,在此方法中,Worker在执行完任务后,还会循环获取任务队列里的任务执行(其中的getTask方法),也就是说Worker不仅仅是在执行完给它的任务就释放或者结束,它不会闲着,而是继续从任务队列中获取任务,直到任务队列中没有任务可执行时,它才退出循环完成任务。理解了以上的源码过后,往后线程池执行原理的第二步、第三步的理解实则水到渠成。

以上がJAVAのThreadPoolExecutorスレッドプール原理とその実行メソッドの例の詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。