Maison >Java >javaDidacticiel >Comment utiliser l'exécuteur du pool de threads Java

Comment utiliser l'exécuteur du pool de threads Java

王林
王林avant
2023-04-28 10:01:061933parcourir

    Diagramme de classes de pool de threads

    Comment utiliser lexécuteur du pool de threads Java

    L'implémentation d'Executors la plus couramment utilisée pour créer un pool de threads et utiliser des threads utilise principalement les classes fournies dans le diagramme de classes ci-dessus. Le diagramme de classes ci-dessus comprend un framework Executor, qui est un framework qui planifie, exécute et contrôle des tâches asynchrones en fonction d'un ensemble d'appels de stratégie d'exécution. Le but est de fournir un mécanisme qui sépare la soumission des tâches de la façon dont la tâche est exécutée. Il contient trois interfaces d'exécuteur :

    • Executor : une interface simple pour exécuter de nouvelles tâches

    • ExecutorService : étend Executor, en ajoutant des méthodes pour gérer le cycle de vie de l'exécuteur et le cycle de vie des tâches

    • ScheduleExcutorService : Extended ExecutorService pour prendre en charge Future et exécution périodique de tâches

    Avantages du pool de threads

    • Réduire la consommation de ressources-Réutiliser les threads existants, réduire le coût de création et de mort d'objets et améliorer les performances

    • Vitesse de réponse - Il peut contrôlez efficacement le nombre maximum de threads simultanés, améliorez l'utilisation des ressources du système, tout en évitant une concurrence excessive entre les ressources et en évitant le blocage. Lorsqu'une tâche arrive, la tâche peut être exécutée immédiatement sans attendre la création du thread

    • Améliorer la gestion des threads-Fournir des fonctions telles que l'exécution planifiée, l'exécution périodique, le thread unique et le contrôle de concurrence.

    Inconvénients du nouveau Thread

    • Chaque fois qu'un nouveau Thread crée un nouvel objet, les performances sont médiocres

    • Manque de gestion unifiée des threads, il peut y avoir un nombre illimité de nouveaux threads, en concurrence les uns avec les autres, et peuvent occuper trop de ressources système et provoquer un crash ou un MOO (débordement de mémoire par manque de mémoire), la cause de ce problème n'est pas simplement un nouveau thread, mais peut être causée par des bogues de programme ou des défauts de conception qui conduisent à de nouveaux threads continus.

    • Il manque plus de fonctionnalités comme plus d'exécutions, des exécutions périodiques, des interruptions de thread.

    Classe principale du pool de threads-ThreadPoolExecutor

    Description du paramètre : ThreadPoolExecutor a un total de sept paramètres. Ces sept paramètres fonctionnent ensemble pour former la fonction puissante du pool de threads.

    corePoolSize : le nombre de threads principaux

    maximumPoolSize : le nombre maximum de threads

    workQueue : file d'attente bloquante, stockage des tâches en attente d'exécution, très important, cela aura un impact significatif sur le thread processus en cours d'exécution du pool

    Lorsque nous soumettons une nouvelle tâche au pool de threads, le pool de threads décidera comment gérer la tâche en fonction du nombre de threads actuellement en cours d'exécution dans le pool. Il existe trois méthodes de traitement :

    1. Commutation directe (SynchronusQueue)

    2. File d'attente illimitée (LinkedBlockingQueue) Le nombre maximum de threads pouvant être créés est corePoolSize, et maximumPoolSize ne fonctionnera pas pour le moment. Lorsque tous les threads principaux du pool de threads sont en cours d'exécution, les nouvelles soumissions de tâches seront placées dans la file d'attente.

    3. La taille maximale du pool de la file d'attente limitée (ArrayBlockingQueue) peut réduire la consommation de ressources, mais cette méthode rend la planification du pool de threads plus difficile. Parce que le pool de threads et la capacité de la file d’attente sont limités. Ainsi, si nous voulons que le débit du pool de threads et des tâches de traitement atteigne une plage raisonnable, et si nous voulons rendre notre planification de threads relativement simple et réduire autant que possible la consommation de ressources, nous devons raisonnablement limiter ces deux techniques d'allocation de quantité. : [ Si vous souhaitez réduire la consommation de ressources, notamment en réduisant l'utilisation du processeur, la consommation des ressources du système d'exploitation, la surcharge de changement de contexte, etc., vous pouvez définir une capacité de file d'attente plus grande et une capacité de pool de threads plus petite, ce qui réduira le débit du pool de threads. . Si les tâches que nous soumettons bloquent souvent, nous pouvons ajuster la taille maximale de PoolSize. Si notre capacité de file d’attente est petite, nous devons définir une taille de pool de threads plus grande, afin que l’utilisation du processeur soit relativement plus élevée. Cependant, si la capacité du pool de threads est trop grande et que le nombre de tâches est trop augmenté, le degré de concurrence augmentera, la planification entre les threads est donc un problème qui doit être pris en compte. Cela peut au contraire réduire le débit des tâches de traitement. ]

    keepAliveTime : Combien de temps un thread peut-il durer lorsqu'il n'y a aucune tâche à exécuter ? (Lorsque le nombre de threads dans le thread est supérieur à corePoolSize, si aucune nouvelle tâche n'est soumise à ce moment, les threads en dehors du thread principal ne sera pas détruit immédiatement, mais attendra. , jusqu'à ce qu'il dépasse keepAliveTime)

    unit : L'unité de temps de keepAliveTime

    threadFactory : Usine de threads, utilisée pour créer des threads, il existe une usine par défaut pour créer des threads, afin que les threads nouvellement créés aient la même priorité, s'agit-il d'un thread non-démon et le nom est défini)

    rejectHandler : La politique en cas de refus de traiter une tâche (la file d'attente de blocage est pleine) (la politique par défaut d'AbortPolicy lève une exception directement, CallerRunsPolicy utilise le thread de l'appelant pour exécuter la tâche et DiscardOldestPolicy supprime la file d'attente) La tâche principale et exécute la tâche en cours, DiscardPolicy supprime directement la tâche en cours)

    Comment utiliser lexécuteur du pool de threads Java

    La relation entre corePoolSize, maximumPoolSize et workQueue : si le nombre de threads en cours d'exécution est inférieur à corePoolSize, un nouveau thread sera créé directement pour gérer la tâche. Même si d’autres threads du pool de threads sont inactifs. Si le nombre de threads en cours d'exécution est supérieur à corePoolSize et inférieur à maximumPoolSize, un nouveau thread sera créé pour traiter la tâche uniquement lorsque la workQueue est pleine. Si corePoolSize et maximumPoolSize sont identiques, la taille du pool de threads créé est fixe. A ce moment, une nouvelle tâche est soumise, et lorsque la workQueue n'est pas pleine, la demande est placée dans la workQueue. Attendez que le thread vide supprime la tâche de workQueue. Si la workQueue est également pleine à ce moment-là, utilisez des paramètres de stratégie de rejet supplémentaires pour exécuter la stratégie de rejet.

    Méthode d'initialisation : composée de sept paramètres répartis en quatre méthodes d'initialisation

    Comment utiliser lexécuteur du pool de threads Java

    Autres méthodes :

    execute();	//提交任务,交给线程池执行	
    submit();//提交任务,能够返回执行结果 execute+Future
    shutdown();//关闭线程池,等待任务都执行完
    shutdownNow();//关闭线程池,不等待任务执行完
    getTaskCount();//线程池已执行和未执行的任务总数
    getCompleteTaskCount();//已完成的任务数量
    getPoolSize();//线程池当前的线程数量
    getActiveCount();//当前线程池中正在执行任务的线程数量

    Cycle de vie du pool de threads :

    Comment utiliser lexécuteur du pool de threads Java

    • en cours d'exécution : peut accepter les tâches nouvellement soumises et peut également les traiter. la file d'attente de blocage

    • arrêt : impossible de traiter de nouvelles tâches, mais peut continuer à traiter les tâches dans la file d'attente de blocage

    • stop : impossible de recevoir de nouvelles tâches, ni de traiter les tâches dans la file d'attente

    • rangement : si toutes les tâches ont été terminés, le nombre de threads effectifs est 0

    • terminé : état final

    Utilisez les exécuteurs pour créer un pool de threads

    À l'aide des exécuteurs, vous pouvez créer quatre pools de threads : correspondant au pool de quatre threads mentionné ci-dessus méthodes d'initialisation

    Executors.newCachedThreadPool

    newCachedThreadPool est un pool de threads qui crée de nouveaux threads selon les besoins, corePoolSize est 0 et aucun thread principal n'est créé. La file d'attente étant toujours pleine, des threads non essentiels sont finalement créés pour effectuer des tâches. Les threads non essentiels seront recyclés lorsqu'ils seront inactifs pendant 60 secondes. Étant donné que Integer.MAX_VALUE est très volumineux, on peut considérer que les threads peuvent être créés à l'infini, ce qui peut facilement provoquer des exceptions MOO lorsque les ressources sont limitées.

    //创建newCachedThreadPool线程池源码
    public static ExecutorService newCachedThreadPool() {
    		/**
            *corePoolSize: 0,核心线程池的数量为0
    		*maximumPoolSize:  Integer.MAX_VALUE,可以认为最大线程数是无限的
    		*keepAliveTime: 60L
    		*unit: 秒
    		*workQueue: SynchronousQueue
            **/
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    Cas d'utilisation :

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
    }

    Il convient de noter que la valeur de retour de newCachedThreadPool est le type ExecutorService. Ce type ne contient que les méthodes de base du pool de threads, mais n'inclut pas les méthodes liées à la surveillance des threads. Par conséquent, lors de l'utilisation de la valeur de retour de. Les types de pool de threads ExecutorService doivent être pris en compte lors de la création de nouveaux threads.

    Comment utiliser lexécuteur du pool de threads Java

    Executors.newSingleThreadExecutor

    newSingleThreadExecutor est un pool de threads à un seul thread et utilise un seul thread partagé pour exécuter les tâches, garantissant que toutes les tâches sont exécutées dans l'ordre spécifié (FIFO, priorité. ..)

    //newSingleThreadExecutor创建线程池源码
    public static ExecutorService newSingleThreadExecutor() {
        /**
          *  corePoolSize : 1,核心线程池的数量为1
    
          *  maximumPoolSize : 1,只可以创建一个非核心线程
    
          *  keepAliveTime : 0L
    
          *  unit => 秒
    
          *  workQueue => LinkedBlockingQueue
          **/
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    Lorsqu'une tâche est soumise, un thread principal sera d'abord créé pour exécuter la tâche si le nombre de threads principaux dépasse le nombre, il sera mis dans la file d'attente car LinkedBlockingQueue est une file d'attente d'une longueur. de Integer.MAX_VALUE, il peut être considéré comme une file d'attente illimitée, il est donc mis dans la file d'attente. Un nombre illimité de tâches peuvent être insérées, ce qui peut facilement provoquer des exceptions MOO lorsque les ressources sont limitées. file d'attente, les paramètres maximumPoolSize et keepAliveTime ne seront pas valides et les threads non principaux ne seront pas créés du tout.

    Executors.newFixedThreadPool

    Le pool de threads de longueur fixe, le nombre de threads principaux et le nombre maximum de threads sont transmis par l'utilisateur. Vous pouvez définir le nombre maximum de threads simultanés et le nombre maximum de threads attendront. la file d'attente

    //newFixedThreadPool创建线程池源码
    public static ExecutorService newFixedThreadPool(int nThreads) {
        	/**
              *  corePoolSize : 核心线程的数量为自定义输入nThreads
    
              *  maximumPoolSize : 最大线程的数量为自定义输入nThreads
    
              *  keepAliveTime : 0L
    
              *  unit : 秒
    
              *  workQueue : LinkedBlockingQueue
              **/
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    newFixedThreadPool est similaire à SingleThreadExecutor, la seule différence est le thread principal. Les nombres sont différents, et comme LinkedBlockingQueue est utilisé, il est facile de provoquer des exceptions MOO lorsque les ressources sont limitées.

    Executors.newScheduledThreadPool

    Pool de threads de longueur fixe, le nombre de threads principaux est transmis par l'utilisateur, prend en charge l'exécution de tâches planifiées et périodiques

    //newScheduledThreadPool创建线程池源码
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        /**
          *  corePoolSize : 核心线程的数量为自定义输入corePoolSize
    
          *  maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE
    
          *  keepAliveTime : 0L
    
          *  unit : 纳秒
    
          *  workQueue : DelayedWorkQueue
          **/
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    Lorsqu'une tâche est soumise, corePoolSize est une entrée personnalisée, le thread principal est créé d'abord, et le thread principal est plein. Ensuite, des threads non principaux sont finalement créés pour effectuer des tâches. Les fils non essentiels seront recyclés après utilisation. Étant donné que Integer.MAX_VALUE est très volumineux, on peut considérer que les threads peuvent être créés à l'infini, ce qui peut facilement provoquer des exceptions MOO lorsque les ressources sont limitées. Parce que le DelayedWorkQueue utilisé peut implémenter des tâches planifiées et périodiques. ScheduledExecutorService propose trois méthodes qui peuvent être utilisées :

    Comment utiliser lexécuteur du pool de threads Java

    schedule : exécuter la tâche après un délai spécifié. planningAtFixedRate : exécuter la tâche à un rythme spécifié. planningWithFixedDelay : exécuter la tâche avec un délai spécifié. Cas d'utilisation :

        public static void main(String[] args) {
    
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    
    //        executorService.schedule(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("schedule run");
    //            }
    //         //延迟3秒后执行
    //        }, 3, TimeUnit.SECONDS);
            //        executorService.shutdown();
    
    //        executorService.scheduleWithFixedDelay(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("scheduleWithFixedDelay run");
    //            }
    //            //延迟一秒后每隔3秒执行
    //        }, 1, 3, TimeUnit.SECONDS);
            
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    log.warn("schedule run");
                }
                //延迟一秒后每隔3秒执行
            }, 1, 3, TimeUnit.SECONDS);
    
            /**
             * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度
             */
    //        Timer timer = new Timer();
    //        timer.schedule(new TimerTask() {
    //            @Override
    //            public void run() {
    //                log.warn("timer run");
    //            }
    //        //从当前时间每隔5秒执行
    //        }, new Date(), 5 * 1000);
        }

    Résumé

    • FixedThreadPool et SingleThreadExecutor La longueur de file d'attente de requêtes autorisée est Integer.MAX_VALUE Un grand nombre de requêtes peuvent s'accumuler, provoquant des exceptions MOO

      .
    • Le nombre de threads autorisés à être créés par CachedThreadPool et newScheduledThreadPool est Integer.MAX_VALUE, ce qui peut créer un grand nombre de threads, provoquant des exceptions MOO

    C'est pourquoi il est interdit d'utiliser des exécuteurs pour créer des pools de threads, mais il est recommandé de créer ThreadPoolExecutor vous-même. Raison

    Comment définir les paramètres du pool de threads

    CPU intensif : Il est recommandé que la taille du pool de threads soit égale au nombre de processeurs + 1. Le nombre de processeurs peut être obtenu. selon la méthode Runtime.availableProcessors IO-intensif : Nombre de processeurs * Utilisation du processeur * ( 1 + Temps d'attente du thread/Temps CPU du thread) Type hybride : Divisez les tâches en tâches gourmandes en CPU et en IO, puis utilisez différents pools de threads pour les traiter, afin que chaque pool de threads puisse les traiter en fonction de leurs charges de travail respectives Ajuster File d'attente de blocage : Il est recommandé d'utiliser une file d'attente limitée pour éviter l'épuisement des ressources : La valeur par défaut est. Stratégie de rejet AbortPolicy, qui lance RejectedExecutionException directement dans le programme [car il exécute une exception Time, pas de capture forcée], cette méthode de traitement n'est pas assez élégante. Les stratégies suivantes sont recommandées pour gérer les rejets :

    • Interceptez l'exception RejectedExecutionException dans le programme et traitez la tâche dans l'exception interceptée. Pour la politique de rejet par défaut

    • , utilisez la politique de rejet CallerRunsPolicy. Cette politique transmettra la tâche au thread qui appelle exécuter [généralement le thread principal pour le moment, le thread principal ne pourra soumettre aucune tâche]. pendant un certain temps, permettant ainsi au thread de travail de gérer les tâches en cours. À ce stade, le thread soumis sera enregistré dans la file d'attente TCP. Si la file d'attente TCP est pleine, cela affectera le client. Pour personnaliser la stratégie de rejet, il vous suffit d'implémenter l'interface RejectedExecutionHandler.

    • Si la tâche n'est pas particulièrement importante, il est également possible d'utiliser les politiques de rejet DiscardPolicy et DiscardOldestPolicy pour supprimer la tâche

    • Si vous utilisez la méthode statique des Executors pour créer un objet ThreadPoolExecutor, vous pouvez utiliser Semaphore pour limiter l'exécution de la tâche et éviter les exceptions MOO

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer