Maison  >  Article  >  Java  >  Explication détaillée du pool de threads Java et création d'exemples simples

Explication détaillée du pool de threads Java et création d'exemples simples

高洛峰
高洛峰original
2017-02-07 14:44:521372parcourir

Java Thread Pool

J'ai récemment amélioré la fonction de concurrence du projet, mais le développement a été cahoteux. Après avoir lu de nombreuses informations, j'ai finalement approfondi ma compréhension. J'ai donc prévu de vérifier ensemble le code source et de résumer les principes de la programmation concurrente.

Soyez prêt à commencer avec le pool de threads le plus utilisé et à comprendre les principes de mise en œuvre de l'ensemble du cycle de vie du pool de threads autour de la création, de l'exécution et de l'arrêt. Plus tard, nous étudierons des sujets tels que les variables atomiques, les conteneurs concurrents, les files d'attente de blocage, les outils de synchronisation, les verrous, etc. Les outils de concurrence dans java.util.concurrent ne sont pas difficiles à utiliser, mais vous ne pouvez pas simplement les utiliser, nous devons lire le putain de code source, haha. Au fait, le JDK que j'utilise est la version 1.8.

Framework Executor

Executor est un framework de gestion de pool de threads. Il n'y a qu'une seule méthode d'exécution dans l'interface, qui exécute les tâches Runnable. L'interface ExecutorService étend Executor, ajoute la gestion du cycle de vie des threads et fournit des méthodes telles que l'arrêt des tâches et le retour des résultats des tâches. AbstractExecutorService implémente ExecutorService et fournit une logique d'implémentation par défaut telle que la méthode de soumission.

Ensuite, le sujet d'aujourd'hui, ThreadPoolExecutor, hérite d'AbstractExecutorService et fournit l'implémentation spécifique du pool de threads.

Méthode constructeur

Ce qui suit est le constructeur le plus courant de ThreadPoolExecutor, avec jusqu'à sept paramètres. Je ne publierai pas le code spécifique, juste quelques instructions pour la vérification et le réglage des paramètres.

public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
  }

corePoolSize est la taille cible du pool de threads, qui est la taille lorsque le pool de threads vient d'être créé et qu'il n'y a aucune tâche à exécuter. maximumPoolSize est la limite supérieure maximale du pool de threads. keepAliveTime est le temps de survie du thread Lorsque le nombre de threads dans le pool de threads est supérieur à corePoolSize, les threads inactifs qui dépassent le temps de survie seront recyclés. Inutile de dire unité, les trois paramètres restants seront analysés plus tard.

Pool de threads personnalisé par défaut

ThreadPoolExecutor prédéfinit certains pools de threads personnalisés, créés par la méthode d'usine dans Executors. Analysons les paramètres de création de newSingleThreadExecutor, newFixedThreadPool et newCachedThreadPool.

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }

corePoolSize et maximumPoolSize de newFixedThreadPool sont tous deux définis sur le nombre fixe entrant et keepAliveTim est défini sur 0. Une fois le pool de threads créé, le nombre de threads sera fixe, ce qui convient aux situations où la stabilité des threads est requise.

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>()));
  }

newSingleThreadExecutor est une version de newFixedThreadPool avec un nombre fixe de threads de 1, assurant la sérialisation des tâches dans le pool. Notez que FinalizingDelegatedExecutorService est renvoyé. Jetons un coup d'œil au code source :

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
      super(executor);
    }
    protected void finalize() {
      super.shutdown();
    }
  }

FinalizingDelegatedExecutorService hérite de DelegatedExecutorService et ajoute uniquement l'opération de fermeture du pool de threads pendant gc. :

static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    public boolean isShutdown() { return e.isShutdown(); }
    public boolean isTerminated() { return e.isTerminated(); }
    //...
  }

Le code est très simple. DelegatedExecutorService enveloppe ExecutorService pour qu'il n'expose que les méthodes d'ExecutorService, donc les paramètres du pool de threads ne peuvent plus être configurés. À l'origine, les paramètres créés par le pool de threads peuvent être ajustés et ThreadPoolExecutor fournit la méthode set. Le but de l'utilisation de newSingleThreadExecutor est de générer un pool de threads série à thread unique. Ce serait ennuyeux si la taille du pool de threads pouvait également être configurée.

Executors fournit également la méthode unconfigurableExecutorService, qui encapsule le pool de threads ordinaire dans un pool de threads non configurable. Si vous ne souhaitez pas que le pool de threads soit modifié par des générations futures inconnues, vous pouvez appeler cette méthode.

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }

newCachedThreadPool génère un pool de threads mis en cache. Le nombre de threads peut aller de 0 à Integer.MAX_VALUE et le délai d'attente est de 1 minute. L'effet de l'utilisation du pool de threads est le suivant : s'il y a un thread inactif, le thread sera réutilisé ; s'il n'y a pas de thread inactif, un nouveau thread sera créé ; si le thread est inactif pendant plus d'une minute, il le sera ; recyclé.

newScheduledThreadPool

newScheduledThreadPool créera un pool de threads pouvant exécuter des tâches régulièrement. Il n’est pas prévu d’en discuter dans cet article et le sera plus tard dans un article séparé.

File d'attente

La limite de threads de newCachedThreadPool est presque égale à illimitée, mais les ressources système sont limitées et la vitesse de traitement de la tâche peut ne pas être aussi rapide que la vitesse de soumission du tâche. Par conséquent, une file d'attente de blocage peut être fournie pour ThreadPoolExecutor afin de sauvegarder les tâches exécutables en attente en raison d'un nombre insuffisant de threads. Il s'agit de BlockingQueue.

JDK fournit plusieurs méthodes d'implémentation pour BlockingQueue. Les méthodes couramment utilisées sont :

ArrayBlockingQueue : file d'attente de blocage de la structure de tableau

LinkedBlockingQueue : file d'attente de blocage de la structure de liste chaînée

PriorityBlockingQueue : file d'attente de blocage prioritaire

SynchronousQueue : file d'attente de blocage qui ne stocke pas d'éléments

newFixedThreadPool et newSingleThreadExecutor utilisent par défaut une LinkedBlockingQueue illimitée. Il convient de noter que si les tâches sont soumises tout le temps, mais que le pool de threads ne peut pas les traiter à temps, la file d'attente s'allongera indéfiniment et il y aura toujours un moment où les ressources système seront épuisées. Par conséquent, il est recommandé d’utiliser une file d’attente limitée pour éviter l’épuisement des ressources. Mais résoudre un problème entraînera de nouveaux problèmes : une fois la file d'attente remplie et de nouvelles tâches arrivées, que devons-nous faire à ce moment-là ? La manière de gérer la saturation des files d’attente sera présentée plus tard.

newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。

饱和策略

当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
  }

AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:

public static class DiscardPolicy implements RejectedExecutionHandler {
   public DiscardPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   }
 }

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   public DiscardOldestPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (!e.isShutdown()) {
       e.getQueue().poll();
       e.execute(r);
     }
   }
 }

DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
   public CallerRunsPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (!e.isShutdown()) {
       r.run();
     }
   }
 }

最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。

ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
  return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
 
    DefaultThreadFactory() {
      SecurityManager s = System.getSecurityManager();
      group = (s != null) ? s.getThreadGroup() :
                 Thread.currentThread().getThreadGroup();
      namePrefix = "pool-" +
             poolNumber.getAndIncrement() +
            "-thread-";
    }
 
    public Thread newThread(Runnable r) {
      Thread t = new Thread(group, r,
                 namePrefix + threadNumber.getAndIncrement(),
                 0);
      if (t.isDaemon())
        t.setDaemon(false);
      if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
      return t;
    }
  }

   

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。

不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

更多Java 线程池详解及创建简单实例相关文章请关注PHP中文网!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn