Maison  >  Article  >  Java  >  Comment utiliser la classe ThreadPoolExecutor du pool de threads Java

Comment utiliser la classe ThreadPoolExecutor du pool de threads Java

王林
王林avant
2023-04-26 13:31:151483parcourir

Il est souligné dans le "Manuel de développement Java d'Alibaba" que les ressources de thread doivent être fournies via le pool de threads et que la création de threads n'est pas autorisée à être affichée dans l'application. plus standardisé et l'ouverture des threads peut être raisonnablement contrôlée. En revanche, la gestion détaillée des threads est confiée au pool de threads, ce qui optimise la surcharge des ressources. Le pool de threads ne peut pas être créé à l'aide d'Executors, mais via ThreadPoolExecutor. En effet, bien que le framework Executor dans jdk fournisse des méthodes telles que newFixedThreadPool(), newSingleThreadExecutor(), newCachedThreadPool(), etc. Ses limites ne sont pas assez flexibles ; de plus, puisque les méthodes précédentes sont également implémentées en interne via ThreadPoolExecutor, l'utilisation de ThreadPoolExecutor peut vous aider à clarifier les règles de fonctionnement du pool de threads, à créer un pool de threads qui répond aux besoins de vos propres scénarios métier, et éviter le risque d’épuisement des ressources.

Ci-dessous, nous donnerons un aperçu détaillé de la façon d'utiliser ThreadPoolExecutor.

Premier coup d'oeil au constructeur de ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

La signification des paramètres du constructeur est la suivante :

corePoolSize : précise le nombre de threads dans le pool de threads, Son numéro détermine si la tâche ajoutée ouvrira un nouveau thread pour exécution ou sera placée dans la file d'attente des tâches workQueue ; Le type de file d'attente des tâches workQueue utilisé détermine le nombre maximum de threads que le pool de threads ouvrira ; 🎜🎜#

keepAliveTime : Lorsque le nombre de threads inactifs dans le pool de threads dépasse corePoolSize, combien de temps faudra-t-il pour que les threads excédentaires soient détruits ;

unit : unité de keepAliveTime# ? 🎜🎜#

workQueue : file d'attente de tâches, tâches qui sont ajoutées au pool de threads mais qui n'ont pas encore été exécutées ; elle est généralement divisée en file d'attente de soumission directe, file d'attente de tâches limitée, file d'attente de tâches illimitée, file d'attente de tâches prioritaires ; 🎜#

threadFactory : Thread factory, utilisée pour créer des threads, utilise généralement la valeur par défaut

handler : Comment rejeter les tâches lorsqu'il y a trop de tâches à traiter ; #

Ensuite, nous comprendrons mieux les paramètres les plus importants :

1. file d'attente des tâches workQueue#🎜🎜 #

Nous l'avons déjà présenté ci-dessus Elle est généralement divisée. dans la file d'attente de soumission directe, la file d'attente de tâches limitée, la file d'attente de tâches illimitée et la file d'attente de tâches prioritaires ;

1 File d'attente de soumission directe : définie sur SynchronousQueue Queue, SynchronousQueue est une file d'attente de blocage spéciale. Elle n'a aucune capacité et bloquera. avant d'effectuer une opération d'insertion. Il doit effectuer une autre opération de suppression avant d'être réveillé. Au contraire, chaque opération de suppression doit attendre l'opération d'insertion correspondante.

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    public void run() {
        System.out.println(Thread.currentThread().getName());

Le résultat de sortie est

pool-1-thread-1

pool-1-thread-2

Exception dans le fil "main" java.util.concurrent.RejectedExecutionException : tâche com.hhxx.test.ThreadTask@55f96302 rejetée de java.util.concurrent.ThreadPoolExecutor@3d4eac69[En cours d'exécution, taille du pool = 2, threads actifs = 0, tâches en file d'attente = 0, tâches terminées = 2]

sur java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Source inconnue)

sur java.util.concurrent.ThreadPoolExecutor.reject(Source inconnue)

sur java. util .concurrent.ThreadPoolExecutor.execute(Unknown Source)
at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)



Vous pouvez le voir lorsque le file d'attente des tâches Pour SynchronousQueue, lorsque le nombre de threads créés est supérieur à maximumPoolSize, la politique de rejet est directement exécutée et une exception est levée.

En utilisant la file d'attente SynchronousQueue, les tâches soumises ne seront pas enregistrées et seront toujours soumises pour exécution immédiatement. Si le nombre de threads utilisés pour effectuer les tâches est inférieur à maximumPoolSize, essayez de créer un nouveau processus. Si la valeur maximale définie par maximumPoolSize est atteinte, la politique de rejet est exécutée selon le gestionnaire que vous avez défini. Par conséquent, les tâches que vous soumettez de cette manière ne seront pas mises en cache, mais seront exécutées immédiatement. Dans ce cas, vous devez avoir une évaluation précise de la simultanéité de votre programme afin de définir le montant maximumPoolSize approprié, sinon ce sera le cas. très difficile. Il est facile d'exécuter la politique de rejet ;

2 : La file d'attente de tâches limitée peut être implémentée à l'aide de ArrayBlockingQueue, comme indiqué ci-dessous

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
Utilisez ArrayBlockingQueue borné. file d'attente des tâches, si de nouvelles tâches doivent être exécutées, le pool de threads créera de nouveaux threads lorsque le nombre de threads créés atteint corePoolSize, les nouvelles tâches seront ajoutées à la file d'attente. Si la file d'attente est pleine, c'est-à-dire qu'elle dépasse la capacité initiale d'ArrayBlockingQueue, continuez à créer des threads jusqu'à ce que le nombre de threads atteigne le nombre maximum de threads défini par maximumPoolSize. S'il est supérieur à maximumPoolSize, la politique de rejet sera exécutée. . Dans ce cas, la limite supérieure du nombre de threads est directement liée à l'état de la file d'attente de tâches limitée. Si la capacité initiale de la file d'attente limitée est grande ou n'a pas atteint un état de surcharge, le nombre de threads sera toujours maintenu. ci-dessous corePoolSize Sinon, lorsque la file d'attente des tâches est pleine, maximumPoolSize sera utilisé comme limite supérieure du nombre maximum de threads.

3. File d'attente de tâches illimitée : la file d'attente de tâches limitée peut être implémentée à l'aide de LinkedBlockingQueue, comme indiqué ci-dessous

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //优先任务队列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    public int getPriority() {
        return priority;
    public void setPriority(int priority) {
        this.priority = priority;
    public ThreadTask() {
        
    public ThreadTask(int priority) {
    //当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

我们来看下执行的结果情况

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

二、拒绝策略

一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:

1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;

3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;

4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;

以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义拒绝策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"执行了拒绝策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //让线程阻塞,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

输出结果:

com.hhxx.test.ThreadTask@33909752执行了拒绝策略
com.hhxx.test.ThreadTask@55f96302执行了拒绝策略
com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

三、ThreadFactory自定义线程创建

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定义线程工厂
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //输出执行线程的名称
        System.out.println("ThreadName:"+Thread.currentThread().getName());

我们看下输出结果

线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

四、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //实现自定义接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("线程"+r.hashCode()+"创建");
                //线程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //输出执行线程的名称
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下输出结果

线程118352462创建
线程1550089733创建
准备执行:Task0
准备执行:Task1
TaskNameTask0---ThreadName:threadPool118352462
线程865113938创建
执行完毕:Task0
TaskNameTask1---ThreadName:threadPool1550089733
执行完毕:Task1
准备执行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
执行完毕:Task3
准备执行:Task2
准备执行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
执行完毕:Task4
准备执行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
执行完毕:Task5
准备执行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
执行完毕:Task6
准备执行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
执行完毕:Task8
准备执行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准备执行:Task7
执行完毕:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
执行完毕:Task7
执行完毕:Task2
线程池退出

可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

五、线程池线程数量

线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

/**
             * Nthreads=CPU数量
             * Ucpu=目标CPU的使用率,0<=Ucpu<=1
             * W/C=任务等待时间与任务计算时间的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer