Maison  >  Article  >  Java  >  Introduction à l'utilisation du pool de threads ThreadPoolExecutor en Java

Introduction à l'utilisation du pool de threads ThreadPoolExecutor en Java

不言
不言avant
2019-04-04 09:55:363615parcourir

Cet article vous présente une introduction à l'utilisation du pool de threads ThreadPoolExecutor en Java. Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer. J'espère qu'il vous sera utile.

Executors

Executors est une classe d'outils en Java. Fournit des méthodes d'usine pour créer différents types de pools de threads.

Introduction à lutilisation du pool de threads ThreadPoolExecutor en Java

Comme le montre la figure ci-dessus, la méthode Executors de création de pools de threads, les pools de threads créés implémentent tous l'interface ExecutorService. Les méthodes couramment utilisées sont les suivantes :

newFixedThreadPool(int Threads) : Create. un pool de threads avec un nombre fixe de threads, et les threads excédentaires attendront dans la file d'attente.

newCachedThreadPool() : Créez un pool de threads pouvant être mis en cache si la longueur du pool de threads dépasse les besoins de traitement, les threads inactifs peuvent être. recyclé de manière flexible (60 secondes), s'il n'y a pas de recyclage, créez un nouveau thread

newSingleThreadExecutor() : créez un pool de threads à un seul thread, qui n'utilisera que le seul thread de travail pour exécuter les tâches, en garantissant que toutes les tâches sont exécutées dans l'ordre spécifié (FIFO, LIFO, priorité). Si une erreur d'exécution de tâche se produit, un autre thread poursuivra l'exécution.

newScheduledThreadPool(int corePoolSize) : créez un pool de threads qui prend en charge les tâches planifiées et périodiques. exécution de tâche , peut être utilisé pour remplacer la classe Timer dans la plupart des cas

Executors Exemple

newCachedThreadPool

Le nombre maximum de threads est Integer.MAX_VALUE, lorsque nous ajoutons n aux tâches du pool de threads, ces n tâches sont toutes exécutées ensemble

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newFixedThreadPool

        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newScheduledThreadPool

est exécuté une fois toutes les trois secondes, et seulement après cette exécution, sera exécuté.

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

exécute chaque tâche dans l'ordre. Une fois la première tâche exécutée, la suivante sera exécutée

<.>
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
Les exécuteurs existent Quel est le problème

Introduction à lutilisation du pool de threads ThreadPoolExecutor en Java

Il est mentionné dans le manuel de développement Alibaba Java que l'utilisation d'exécuteurs pour créer un pool de threads peut provoquer un MOO (OutOfMemory, débordement de mémoire ), mais cela n'explique pas pourquoi, alors voyons ensuite pourquoi les exécuteurs ne sont pas autorisés à être utilisés ?

Commençons par un exemple simple pour simuler la situation dans laquelle l'utilisation d'exécuteurs provoque un MOO

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i Exécutez le code ci-dessus en spécifiant les paramètres JVM : -Xmx8m -Xms8m, le MOO sera généré : <p></p><pre class="brush:php;toolbar:false">Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
Le code ci-dessus souligne que la ligne 16 de ExecutorsDemo.java est executor.execute(new SubThread()) dans le code ;

Il existe deux implémentations principales de BlockingQueue en Java , respectivement ArrayBlockingQueue et LinkedBlockingQueue.

ArrayBlockingQueue est une file d'attente de blocage limitée implémentée avec un tableau, la capacité doit être définie

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity LinkedBlockingQueue est une file d'attente de blocage limitée implémentée avec une liste chaînée, La capacité peut être facultativement définie. Si elle n'est pas définie, il s'agira d'une file d'attente de blocage illimitée d'une longueur maximale de Integer.MAX_VALUE.<p></p><pre class="brush:php;toolbar:false">public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
Le problème ici est que si nous ne définissons pas la capacité de LinkedBlockingQueue. , Sa capacité par défaut sera Integer.MAX_VALUE.

Lors de la création de LinkedBlockingQueue dans newFixedThreadPool, aucune capacité n'a été spécifiée pour le moment, LinkedBlockingQueue est une file d'attente illimitée. Pour une file d'attente illimitée, il peut s'agir d'un ajout continu de tâches au. file d'attente, dans ce cas, il peut y avoir des problèmes de dépassement de mémoire en raison d'un trop grand nombre de tâches.

newCachedThreadPool et newScheduledThreadPool Le nombre maximum de threads créés par ces deux méthodes peut être Integer.MAX_VALUE, et créer Avec autant de threads, il est inévitable de provoquer un MOO.

ThreadPoolExecutor crée un pool de threads

Évitez d'utiliser des exécuteurs pour créer un pool de threads, principalement pour éviter d'utiliser l'implémentation par défaut, nous pouvons alors appeler directement ThreadPoolExecutor nous-mêmes Constructeur pour créez vous-même le pool de threads. Lors de la création, spécifiez simplement la capacité de BlockQueue

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));
Dans ce cas, une fois que le nombre de threads soumis dépasse le nombre actuel de threads disponibles, il lancera java.util. concurrent.RejectedExecutionException. En effet, la file d'attente actuellement utilisée par le pool de threads est une file d'attente limitée. Si la file d'attente est pleine, il ne peut pas continuer à traiter de nouvelles requêtes.

En plus de définir ThreadPoolExecutor vous-même. Autres méthodes. Comme Apache et Guava, etc.

Quatre constructeurs

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)</runnable></runnable></runnable></runnable>
int corePoolSize => Le nombre maximum de threads principaux dans ce pool de threads

Le nombre de nouveaux threads dans le pool de threads À ce stade, si le nombre total actuel de threads est inférieur à corePoolSize, les threads principaux nouvellement créés seront nouveaux. S'il dépasse corePoolSize, les nouveaux nouveaux threads seront des threads non principaux
Par défaut, les threads principaux survivront toujours dans le pool de threads, même si le thread principal ne fait rien non plus (état inactif).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

【相关推荐:Java视频教程

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer