Home  >  Article  >  Java  >  How to implement Java ThreadPoolExecutor's rejection policy?

How to implement Java ThreadPoolExecutor's rejection policy?

WBOY
WBOYforward
2023-05-08 15:34:08778browse

    Basic principle of thread pool

    The principle of thread pool is as follows:

    How to implement Java ThreadPoolExecutors rejection policy?

    Instructions:

    • If the number of currently running threads is less than corePoolSize, create a new thread to perform the task.

    • If the running threads are equal to or more than corePoolSize, the task is added to the queue.

    • When the task queue is full, a new thread is created in a non-corePool to process the task.

    • Creating a new thread will cause the currently running threads to exceed maximumPoolSize, the task will be rejected, and the RejectedExecutionHandler.rejectedExecution() method will be called.

    Thread pool rejection policy

    The thread pool provides us with four rejection policies: CallerRunsPolicy, AbortPolicy, DiscardPolicy, DiscardOldestPolicy

    AbortPolicy

    The default rejection strategy in ThreadPoolExecutor is that AbortPolicy directly throws an exception. The specific implementation is as follows

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    Description: This strategy is very simple and crude. It will directly throw a RejectedExecutionException exception and will not perform subsequent tasks. .

    Example description:

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.AbortPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }        
        }
    }
    
    public class TestThread implements Runnable
    {
        private String name;
        public TestThread(String name){
            this.name=name;
        }
        
        @Override
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println("thread name:"+Thread.currentThread().getName()+",执行:"+name);
        }
    }

    Execution result:

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.skywares.fw.juc .thread.TestThread@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent. ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369)
    at com.skywares.fw.juc.thread.ThreadPoolTest.main(ThreadPoolTest.java:26)
    thread name:pool-1-thread-5, execution: thread 5
    thread name: pool-1-thread-2, execution: thread 1
    thread name: pool-1-thread-4, execution: thread 4
    thread name: pool-1-thread-3, execution: thread 3
    thread name: pool-1-thread-1, execution: thread 0
    thread name: pool-1-thread-5, execution: thread 2

    We know from the execution results, Using the AbortPolicy strategy, an error will be reported directly when the task is executed to the seventh task, resulting in subsequent business logic not being executed.

    CallerRunsPolicy

    After the task is rejected, CallerRunsPolicy will use the upper thread that calls the execute function to execute the rejected task.

    Related examples

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }
        }
    }

    Execution results:

    Add the 0th task
    Add the 1st task
    Add the 2nd task
    Add the 3rd task
    Add the 4th task
    Add the 5th task
    Add the 6th task
    thread name:main, execution: thread 6
    thread name:pool- 1-thread-3, execution: thread 3
    thread name: pool-1-thread-1, execution: thread 0
    thread name: pool-1-thread-4, execution: thread 4
    thread name:pool-1-thread-2, execution: thread 1
    thread name: pool-1-thread-5, execution: thread 5
    Add the 7th task
    Add the 8th task
    thread name:main,execution:thread 8
    thread name:pool-1-thread-1,execution:thread 7
    thread name:pool-1-thread-3,execution:thread 2
    Add The 9th task
    thread name:pool-1-thread-1, execution: thread 9

    From the execution results, we can know that when the 7th task is executed, due to Thread pool rejection policy, this task is executed by the main thread, and other tasks will continue to be executed when the thread pool is idle. So this strategy may block the main thread.

    DiscardPolicy

    This rejection policy is relatively simple. Tasks rejected by the thread pool are directly discarded without throwing exceptions or executing

    Example

    Modify the above code and change the rejection policy to DiscardPolicy

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    Execution result

    ##invoke dealStock success

    goodsId:mobile phone
    thread name:pool-1- thread-1, execution: thread 0
    thread name: pool-1-thread-4, execution: thread 4
    thread name: pool-1-thread-5, execution: thread 5
    thread name: pool-1-thread-3, execution: thread 3
    thread name: pool-1-thread-2, execution: thread 1
    thread name: pool-1-thread-1, execution: thread 2

    Judging from the execution results, only 6 tasks were executed, and the other tasks were abandoned.

    DiscardOldestPolicy

    DiscardOldestPolicy When a task refuses to be added, the task that was first added to the queue will be discarded and the new task will be added.

    Example description

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    Execution result:

    Add the 0th task

    Add the 1st task
    Add the 2nd task
    Add the 3rd task
    Add the 4th task
    Add the 5th task
    invoke dealStock success
    goodsId: mobile phone
    thread name:pool-1-thread-2, execute: Thread 3
    thread name:pool-1-thread-1, execution: thread 0
    thread name: pool-1-thread-1, execution: thread 2
    thread name:pool-1-thread- 2. Execution: Thread 1

    自定义拒绝策略

    当线程池提供的拒绝策略无法满足要求时,我们可以采用自定义的拒绝策略,只需要实现RejectedExecutionHandler接口即可

    public class CustRejectedExecutionHandler implements RejectedExecutionHandler
    {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
        {
            new Thread(r,"线程:"+new Random().nextInt(10)).start();
        }
    }
    
      ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new CustRejectedExecutionHandler());

    执行结果:

    thread name:客户线程:6,执行:线程5
    thread name:pool-1-thread-1,执行:线程0
    thread name:客户线程:8,执行:线程4
    thread name:pool-1-thread-2,执行:线程3
    thread name:pool-1-thread-1,执行:线程1
    thread name:pool-1-thread-2,执行:线程2

    从执行的结果来看,被拒绝的任务都在客户的新线程中执行。

    小结

    • AbortPolicy:直接抛出异常,后续的任务不会执行

    • CallerRunsPolicy:子任务执行的时间过长,可能会阻塞主线程。

    • DiscardPolicy:不抛异常,任务直接丢弃

    • DiscardOldestPolicy;丢弃最先加入队列的任务

    The above is the detailed content of How to implement Java ThreadPoolExecutor's rejection policy?. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete