Maison >Java >javaDidacticiel >Comment implémenter la politique de rejet de Java ThreadPoolExecutor ?

Comment implémenter la politique de rejet de Java ThreadPoolExecutor ?

WBOY
WBOYavant
2023-05-08 15:34:08866parcourir

    Principe de base du pool de threads

    Le principe du pool de threads est le suivant :

    #🎜🎜 #Comment implémenter la politique de rejet de Java ThreadPoolExecutor ?

    Description :

    • Si le nombre de threads en cours d'exécution est inférieur à corePoolSize, créez un nouveau thread pour effectuer la tâche .

    • Si les threads en cours d'exécution sont égaux ou supérieurs à corePoolSize, la tâche est ajoutée à la file d'attente.

    • Lorsque la file d'attente des tâches est pleine, un nouveau thread est créé dans le non-corePool pour traiter la tâche.

    • La création d'un nouveau thread entraînera un dépassement du nombre de threads en cours d'exécution maximumPoolSize, la tâche sera rejetée et la méthode RejectedExecutionHandler.rejectedExecution() sera appelée.

    Stratégie de rejet du pool de threads

    Le pool de threads nous propose quatre stratégies de rejet : CallerRunsPolicy, AbortPolicy, DiscardPolicy, DiscardOldestPolicy#🎜 🎜## 🎜🎜#AbortPolicy

    La stratégie de rejet par défaut dans ThreadPoolExecutor est qu'AbortPolicy lève directement une exception. L'implémentation spécifique est la suivante

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    Explication : Cette stratégie est très simple et grossière. , direct Si une RejectedExecutionException est levée, les tâches suivantes ne seront pas exécutées.

    Exemple de description :

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.AbortPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }        
        }
    }
    
    public class TestThread implements Runnable
    {
        private String name;
        public TestThread(String name){
            this.name=name;
        }
        
        @Override
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println("thread name:"+Thread.currentThread().getName()+",执行:"+name);
        }
    }

    Résultat de l'exécution :

    Exception dans le thread "main" java.util.concurrent.RejectedExecutionException : Tâche com.skywares.fw.juc.thread.TestThread@55f96302 rejeté de java.util.concurrent.ThreadPoolExecutor@3d4eac69[En cours d'exécution, taille du pool = 5, threads actifs = 5, tâches en file d'attente = 1, tâches terminées = 0]#🎜🎜 # sur java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

    sur java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    sur java.util . concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)

    at com.skywares.fw.juc.thread.ThreadPoolTest.main(ThreadPoolTest.java:26)
    nom du thread:pool-1-thread- 5. Exécution : thread 5
    nom du thread : pool-1-thread-2, exécution : thread 1
    nom du thread : pool-1-thread-4, exécution : thread 4
    nom du thread :pool-1-thread-3, exécution : thread 3
    nom du thread : pool-1-thread-1, exécution : thread 0
    nom du thread : pool-1-thread-5, exécution : thread 2


    Nous savons d'après les résultats d'exécution que lorsque la stratégie AbortPolicy est utilisée, une erreur sera signalée directement lorsque la tâche est exécutée à la septième tâche, ce qui empêchera la logique métier ultérieure exécuté.

    CallerRunsPolicy

    CallerRunsPolicy utilisera le thread supérieur qui appelle la fonction d'exécution pour exécuter la tâche rejetée une fois la tâche rejetée.

    Exemples associés

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }
        }
    }

    Résultats d'exécution :

    Ajouter la 0ème tâche

    Ajouter la 1ère tâche#🎜 🎜# Ajouter une 2ème tâche

    Ajouter une 3ème tâche
    Ajouter une 4ème tâche

    Ajouter une 5ème tâche
    Ajouter une 6ème tâche
    nom du thread : principal, exécution : thread 6
    nom du thread : pool-1-thread-3, exécution : thread 3
    nom du thread : pool-1-thread-1, exécution : thread 0
    nom du thread :pool-1-thread-4,Execute:Thread 4
    nom du fil : pool-1-thread-2,Exécuter :Thread 1
    nom du fil :pool-1-thread-5,Exécuter :Thread 5
    Ajouter la 7ème tâche#🎜🎜 #Ajouter la 8ème tâche
    thread name:main, exécution : thread 8
    thread name : pool-1-thread-1, exécution : thread 7
    thread name:pool-1-thread- 3, exécution : thread 2
    Ajouter la 9ème tâche
    nom du thread : pool-1-thread-1, exécution : thread 9#🎜 🎜#

    À partir des résultats de l'exécution, nous pouvons savoir que lorsque la 7ème tâche est exécutée, en raison de la politique de rejet du pool de threads, cette tâche est exécutée par le thread principal. Lorsque le pool de threads est inactif, il continuera à effectuer d'autres tâches.
    Cette stratégie risque donc de bloquer le fil principal.


    DiscardPolicy

    Cette stratégie de rejet est relativement simple. Les tâches rejetées par le pool de threads sont directement rejetées sans lever d'exceptions ni exécuter

    #🎜 🎜#. ExempleModifiez le code ci-dessus et changez la politique de rejet en DiscardPolicy

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    Résultat d'exécution

    invoke dealStock success#🎜 🎜# marchandisesId : téléphone mobile

    nom du thread : pool-1-thread-1, exécution : thread 0

    nom du thread : pool-1-thread-4, exécution : thread 4

    nom du thread : pool -1-thread-5, exécution : thread 5

    nom du thread : pool-1-thread-3, exécution : thread 3

    nom du thread : pool-1-thread-2, exécution : thread 1 # 🎜🎜#nom du thread : pool-1-thread-1, exécution : thread 2

    D'après les résultats de l'exécution, seules 6 tâches ont été exécutées et d'autres tâches ont été abandonnées.

    DiscardOldestPolicy

    DiscardOldestPolicy Lorsqu'une tâche refuse d'être ajoutée, la tâche qui a été ajoutée en premier à la file d'attente sera ignorée et la nouvelle tâche sera ajoutée.

    Exemple de description

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    Résultat de l'exécution :

    Ajouter la 0ème tâche

    Ajouter la 1ère tâche#🎜 🎜# Ajouter une 2ème tâche

    Ajouter une 3ème tâche

    Ajouter une 4ème tâche

    Ajouter une 5ème tâche

    invoke dealStock success

    goodsId:mobile phone#🎜 🎜#thread name:pool-1- thread-2,exécution:thread 3

    nom du thread:pool-1-thread-1,exécution:thread 0

    nom du thread:pool-1-thread-1 ,Exécution : thread 2
    nom du fil : pool-1-thread-2, Exécution : fil 1

    自定义拒绝策略

    当线程池提供的拒绝策略无法满足要求时,我们可以采用自定义的拒绝策略,只需要实现RejectedExecutionHandler接口即可

    public class CustRejectedExecutionHandler implements RejectedExecutionHandler
    {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
        {
            new Thread(r,"线程:"+new Random().nextInt(10)).start();
        }
    }
    
      ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new CustRejectedExecutionHandler());

    执行结果:

    thread name:客户线程:6,执行:线程5
    thread name:pool-1-thread-1,执行:线程0
    thread name:客户线程:8,执行:线程4
    thread name:pool-1-thread-2,执行:线程3
    thread name:pool-1-thread-1,执行:线程1
    thread name:pool-1-thread-2,执行:线程2

    从执行的结果来看,被拒绝的任务都在客户的新线程中执行。

    小结

    • AbortPolicy:直接抛出异常,后续的任务不会执行

    • CallerRunsPolicy:子任务执行的时间过长,可能会阻塞主线程。

    • DiscardPolicy:不抛异常,任务直接丢弃

    • DiscardOldestPolicy;丢弃最先加入队列的任务

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer