Maison  >  Article  >  Java  >  Analyse du processus de création du pool de threads Java

Analyse du processus de création du pool de threads Java

黄舟
黄舟original
2017-02-23 10:44:401370parcourir

Récemment, j'ai amélioré la fonction de concurrence du projet, mais le développement a été cahoteux. Après avoir lu de nombreuses informations, j'ai finalement approfondi ma compréhension. J'ai donc prévu de vérifier ensemble le code source et de résumer les principes de la programmation concurrente.

Soyez prêt à commencer avec le pool de threads le plus utilisé et à comprendre les principes de mise en œuvre de l'ensemble du cycle de vie du pool de threads autour de la création, de l'exécution et de l'arrêt. Plus tard, nous étudierons des sujets tels que les variables atomiques, les conteneurs concurrents, les files d'attente de blocage, les outils de synchronisation, les verrous, etc. Les outils de concurrence dans java.util.concurrent ne sont pas difficiles à utiliser, mais vous ne pouvez pas simplement les utiliser, nous devons lire le putain de code source, haha. Au fait, le JDK que j'utilise est la version 1.8.

Framework Executor

Executor est un framework de gestion de pool de threads. Il n'y a qu'une seule méthode d'exécution dans l'interface, qui exécute les tâches Runnable. L'interface ExecutorService étend Executor, ajoute la gestion du cycle de vie des threads et fournit des méthodes telles que l'arrêt des tâches et le retour des résultats des tâches. AbstractExecutorService implémente ExecutorService et fournit une logique d'implémentation par défaut telle que la méthode de soumission.

Ensuite, le sujet d'aujourd'hui, ThreadPoolExecutor, hérite d'AbstractExecutorService et fournit l'implémentation spécifique du pool de threads.

Méthode de constructeur

Ce qui suit est le constructeur le plus courant de ThreadPoolExecutor, avec jusqu'à sept paramètres. Je ne publierai pas le code spécifique, juste quelques instructions pour la vérification et le réglage des paramètres.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }

corePoolSize est la taille cible du pool de threads, qui est la taille lorsque le pool de threads vient d'être créé et qu'il n'y a aucune tâche à exécuter. maximumPoolSize est la limite supérieure maximale du pool de threads. keepAliveTime est le temps de survie du thread Lorsque le nombre de threads dans le pool de threads est supérieur à corePoolSize, les threads inactifs qui dépassent le temps de survie seront recyclés. Inutile de dire unité, les trois paramètres restants seront analysés plus tard.

Pool de threads personnalisé par défaut

ThreadPoolExecutor prédéfinit certains pools de threads personnalisés, créés par la méthode d'usine dans Executors. Analysons les paramètres de création de newSingleThreadExecutor, newFixedThreadPool et newCachedThreadPool.

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize et maximumPoolSize de newFixedThreadPool sont tous deux définis sur le nombre fixe entrant et keepAliveTim est défini sur 0. Une fois le pool de threads créé, le nombre de threads sera fixe, ce qui convient aux situations où la stabilité des threads est requise.

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor est une version de newFixedThreadPool avec un nombre fixe de threads de 1, assurant la sérialisation des tâches dans le piscine. Notez que FinalizingDelegatedExecutorService est renvoyé. Jetons un coup d'œil au code source :

static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

FinalizingDelegatedExecutorService hérite de DelegatedExecutorService et ajoute uniquement l'opération de fermeture du pool de threads pendant gc. au code source de DelegatedExecutorService :

 static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        //...
    }

Le code est très simple. DelegatedExecutorService enveloppe ExecutorService afin qu'il n'expose que les méthodes d'ExecutorService, de sorte que les paramètres du pool de threads ne peuvent pas. ne soit plus configuré. À l'origine, les paramètres créés par le pool de threads peuvent être ajustés et ThreadPoolExecutor fournit la méthode set. Le but de l'utilisation de newSingleThreadExecutor est de générer un pool de threads série à thread unique. Ce serait ennuyeux si la taille du pool de threads pouvait également être configurée.

Executors fournit également la méthode unconfigurableExecutorService, qui encapsule le pool de threads ordinaire dans un pool de threads non configurable. Si vous ne souhaitez pas que le pool de threads soit modifié par des générations futures inconnues, vous pouvez appeler cette méthode.

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool génère un pool de threads en cache. Le nombre de threads peut être compris entre 0 et Integer.MAX_VALUE, et le nombre de threads peut être compris entre 0 et Integer.MAX_VALUE. le délai d'attente est de 1 minute. L'effet de l'utilisation du pool de threads est le suivant : s'il y a un thread inactif, le thread sera réutilisé ; s'il n'y a pas de thread inactif, un nouveau thread sera créé ; si le thread est inactif pendant plus d'une minute, il le sera ; recyclé.

newScheduledThreadPool

newScheduledThreadPool créera un pool de threads qui pourra exécuter des tâches régulièrement. Il n’est pas prévu d’en discuter dans cet article et le sera plus tard dans un article séparé.

等待队列

newCachedThreadPool的线程上限几乎等同于无限,但系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。

JDK为BlockingQueue提供了几种实现方式,常用的有:

ArrayBlockingQueue:数组结构的阻塞队列

LinkedBlockingQueue:链表结构的阻塞队列

PriorityBlockingQueue:有优先级的阻塞队列

SynchronousQueue:不会存储元素的阻塞队列

newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。

newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。

饱和策略

当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
       public AbortPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
       }
   }

AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:

 

public static class DiscardPolicy implements RejectedExecutionHandler {
       public DiscardPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       }
   }

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。

 

 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       public DiscardOldestPolicy() { }
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               e.getQueue().poll();
               e.execute(r);
           }
       }
   }

DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。

ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
       private static final AtomicInteger poolNumber = new AtomicInteger(1);
       private final ThreadGroup group;
       private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final String namePrefix;
       DefaultThreadFactory() {
           SecurityManager s = System.getSecurityManager();
           group = (s != null) ? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
           namePrefix = "pool-" +
                         poolNumber.getAndIncrement() +
                        "-thread-";
       }
       public Thread newThread(Runnable r) {
           Thread t = new Thread(group, r,
                                 namePrefix + threadNumber.getAndIncrement(),
                                 0);
           if (t.isDaemon())
               t.setDaemon(false);
           if (t.getPriority() != Thread.NORM_PRIORITY)
               t.setPriority(Thread.NORM_PRIORITY);
           return t;
       }
   }

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。

不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。

欢迎留言和转发,下一篇打算分析线程池如何执行任务。

 以上就是Java 线程池的创建过程分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn