Maison >Java >javaDidacticiel >Explication détaillée des principes du pool de threads et de l'exécuteur en Java

Explication détaillée des principes du pool de threads et de l'exécuteur en Java

黄舟
黄舟original
2017-07-30 10:43:001303parcourir

Cet article présente principalement des informations pertinentes qui expliquent en détail l'analyse du pool de threads Java et les principes d'Executor. Des exemples et des principes d'analyse sont fournis ici pour aider tout le monde à comprendre cette partie des connaissances. Les amis dans le besoin peuvent se référer à

Analyse détaillée du pool de threads Java et des principes d'Executor

Fonction du pool de threads et connaissances de base

Avant de commencer, discutons de « threads » Le concept de « pool ». "Thread pool", comme son nom l'indique, est un cache de threads. Il s'agit d'un ensemble d'un ou plusieurs threads. Les utilisateurs peuvent simplement lancer les tâches qui doivent être effectuées dans le pool de threads sans trop se soucier des détails de l'exécution. Alors, quelles sont les fonctions du pool de threads ? Ou quels sont les avantages par rapport à l’utilisation directe de Thread ? J'ai brièvement résumé les points suivants :

Réduire la consommation provoquée par la création et la destruction des threads

Pour la mise en œuvre de Java Thread, j'ai écrit dans l'article précédent Analysé dans ce blog. Java Thread et le thread du noyau sont 1:1 (Linux). De plus, Thread contient de nombreuses données membres dans la couche Java et la couche C++, donc Java Thread est en fait relativement lourd. La création et la destruction d'un thread Java nécessitent à la fois un travail important du système d'exploitation et de la JVM, donc si le thread Java est mis en cache, une certaine amélioration de l'efficacité peut être obtenue.

Mise en œuvre plus pratique et transparente du contrôle des ressources informatiques

Pour en discuter, vous devrez peut-être donner quelques exemples. Prenons l'exemple du très célèbre serveur Web Nginx. Nginx est connu pour ses puissantes capacités de concurrence et sa faible consommation de ressources. Afin de répondre à ces exigences strictes, Nginx limite strictement le nombre de threads de travail (les threads de travail sont généralement égaux au nombre de processeurs). L'objectif de cette conception est de réduire la perte de performances causée par le changement de thread. Cette méthode d'optimisation est également applicable à Java. Si un nouveau Thread est créé pour chaque tâche, le résultat final sera que les ressources du programme seront difficiles à contrôler (une certaine fonction remplit le CPU) et la vitesse d'exécution globale sera relativement lente. Le pool de threads Java fournit FixedThreadPool, que vous pouvez utiliser pour contrôler le nombre maximum de threads.

Avec tant de « bêtises » mentionnées ci-dessus, analysons-le en fonction de l'implémentation du pool de threads Java ! Le pool de threads de Java a plusieurs implémentations :

Pool de threads mis en cache

La caractéristique du pool de threads mis en cache est qu'il met en cache les threads précédents et les tâches nouvellement soumises. Peut s'exécuter dans un thread mis en cache, qui permet d'obtenir le premier avantage mentionné ci-dessus.

ThreadPool corrigé

Une fonctionnalité de cachedThreadPool est que s'il n'y a pas de thread inactif pour exécuter la tâche nouvellement soumise, un nouveau thread sera créé. FixedThreadPool ne fera pas cela. Il enregistrera la tâche et attendra qu'il y ait un thread inactif avant de l'exécuter. C'est-à-dire que le deuxième avantage mentionné ci-dessus est obtenu.

ThreadPool planifié

La caractéristique du ThreadPool planifié est qu'il peut réaliser la planification de tâches, telle que l'exécution retardée et l'exécution périodique de tâches.

En plus des trois ci-dessus, Java implémente également newWorkStealingPool, qui est basé sur le framework Fork/Join. Je n'ai pas encore étudié la question, je vais donc la laisser de côté. Dans la prise en charge de la concurrence par Java, Executor est utilisé pour regrouper divers pools de threads. Le nom « exécuteur » est en fait tout à fait approprié. Un pool de threads n'est qu'un exécuteur !

1. Implémentation du ThreadPool mis en cache et du ThreadPool fixe

Comme le montre la description précédente, ces deux pools de threads sont très similaires. C'est effectivement le cas. En fait, ils sont mis en œuvre en même temps. Sinon, regardons un exemple pratique :


ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();


Ceci sont deux méthodes pour créer de nouveaux pools de threads. Elles se ressemblent beaucoup ! Si vous ne le pensez pas, je ne peux que vous montrer la vérité.
ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);


Oui, ils appellent le même constructeur, juste avec des paramètres légèrement différents. Jetons donc un coup d'œil à la signification de ces paramètres et à la différence entre les deux ensembles de paramètres. Tout d'abord, vous devez toujours publier le constructeur de ThreadPoolExecutor.
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>());
}


Afin d'avoir l'air frais, je ne publierai pas le constructeur d'un autre calque, et ce constructeur n'est qu'une simple affectation. Le prototype de fonction ici peut déjà nous donner beaucoup d'informations. Je dois dire que le nom du code JDK est vraiment bon, tout comme les commentaires.
public ThreadPoolExecutor(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), defaultHandler);
}

maximumPoolSize est le nombre maximum de threads dans le pool de threads ; pour ThreadPool mis en cache, cette valeur est Integer.MAX_VALUE, ce qui est fondamentalement équivalent à l'infini. Quel type de machine peut exécuter des milliards de threads ! ! Pour ThreadPool fixe, cette valeur correspond au nombre de pools de threads définis par l'utilisateur.

keepAliveTime et unit déterminent le délai d'expiration du cache du thread ; pour le ThreadPool mis en cache, le délai d'expiration du cache du thread est d'une minute. En d'autres termes, si un thread de travail n'a rien à faire pendant une minute, il sera révoqué. pour économiser de l'argent. Le temps passé au ThreadPool fixe est de 0, ce qui signifie que le thread de travail dans le ThreadPool fixe n'expirera jamais.


corePoolSize是线程池的最小线程数;对于cached ThreadPool,这个值为0,因为在完全没有任务的情况下,cached ThreadPool的确会成为“光杆司令”。至于fixed ThreadPool,这个fixed已经表明corePoolSize是等于线程总数的。
接下来,我们根据一个简单的使用例子,来看看一下cached ThreadPool的流程。


public class Task implements Callable<String> {

private String name;
public Task(String name) {
  this.name = name;
}
@Override
public String call() throws Exception {
  System.out.printf("%s: Starting at : %s\n", this.name, new Date());
  return "hello, world";
}
public static void main(String[] args) {
  ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
  Task task = new Task("test");
  Future<String> result = executor.submit(task);
  try {
    System.out.printf("%s\n", result.get());
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
  executor.shutdown();
  System.out.printf("Main ends at : %s\n", new Date());
}
}

首先,来看看executor.submit(task),这其实调用了ThreadPoolExecutor.execute(Runnable command)方法,这个方法的代码如下,整段代码的逻辑是这样的。首先检查线程池的线程数是否不够corePoolSize,如果不够就直接新建线程并把command添加进去;如果线程数已经够了或者添加失败(多个线程增加添加的情况),就尝试把command添加到队列中(workQueue.offer(command)),如果添加失败了,就reject掉cmd。大体的逻辑是这样的,这段代码有很多基于线程安全的设计,这里为了不跑题,就先忽略细节了。


public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  else if (!addWorker(command, false))
    reject(command);
}

到这里,看起来线程池实现的整体思路其实也没多么复杂。但是还有一个问题——一个普通的Thread在执行完自己的run方法后会自动退出。那么线程池是如何实现Worker线程不断的干活,甚至在没有任务的时候。其实答案很简单,就是Worker其实在跑大循环,Worker实际运行方法如下:


final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {
      w.lock();
  /***/
      try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        /***/
        } finally {
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

关键就在这个while的判断条件,对于需要cached线程的情况下,getTask()会阻塞起来,如果缓存的时间过期,就会返回一个null,然后Worker就退出了,也就结束了它的服役周期。而在有任务的情况下,Woker会把task拿出来,然后调用task.run()执行任务,并通过Future通知客户线程(即future.get()返回)。这样一个简单的线程池使用过程就完了。。。

当然,线程池的很多精髓知识——基于线程安全的设计,我都没有分析。有兴趣可以自己分析一下,也可以和我讨论。此外Scheduled ThreadPool这里也没有分析,它的要点其实是调度,主要是根据时间最小堆来驱动的。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn