首頁  >  文章  >  Java  >  如何實作Java ThreadPoolExecutor的拒絕策略?

如何實作Java ThreadPoolExecutor的拒絕策略?

WBOY
WBOY轉載
2023-05-08 15:34:08719瀏覽

    執行緒池基本原理

    執行緒池的原理如下圖:

    如何實作Java ThreadPoolExecutor的拒絕策略?

    ##說明:

    • 目前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務。

    • 執行的執行緒等於或多於corePoolSize,則將任務新增至佇列。

    • 當任務佇列已滿,則在非corePool中建立新的執行緒來處理任務。

    • 建立新執行緒將使目前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。

    執行緒池拒絕策略

    執行緒池為我們提供了四種拒絕策略分別是:CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy

    AbortPolicy

    ThreadPoolExecutor中預設的拒絕策略就是AbortPolicy直接拋出異常,具體實現如下

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    說明:這種策略非常簡單粗暴,直接拋出RejectedExecutionException異常,也不會執行後續的任務。

    範例說明:

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.AbortPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }        
        }
    }
    
    public class TestThread implements Runnable
    {
        private String name;
        public TestThread(String name){
            this.name=name;
        }
        
        @Override
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println("thread name:"+Thread.currentThread().getName()+",执行:"+name);
        }
    }

    執行結果:

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.skywares.fw.juc .thread.TestThread@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tas. ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) > PoolExecutor.java: 1369)
        at com.skywares.fw.juc.thread.ThreadPoolTest.main(ThreadPoolTest.java:26)
    thread name:pool-1-thread-5,執行:執行緒5
    thread name:#thread name:pool-1-thread-5,執行:執行緒5
    thread name: pool-1-thread-2,執行:執行緒1
    thread name:pool-1-thread-4,執行:執行緒4
    thread name:pool-1-thread-3,執行:執行緒3
    thread name:pool-1-thread-1,執行:執行緒0
    thread name:pool-1-thread-5,執行:執行緒2

    從執行結果我們得知,採用AbortPolicy策略當任務執行到第七個任務時會直接報錯,導致後續的業務邏輯不會執行。

    CallerRunsPolicy

    CallerRunsPolicy在任務被拒絕加入後,會用呼叫execute函數的上層執行緒去執行被拒絕的任務。

    相關範例

    public class ThreadPoolTest
    {
        public static void main(String[] args)
        {
            ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());
            
            //异步执行
            for(int i=0; i<10;i++)
            {
              System.out.println("添加第"+i+"个任务");
              threadPoolExecutor.execute(new TestThread("线程"+i));
            }
        }
    }

    執行結果:

    新增第0個任務
    新增第1個任務

    新增第2個任務
    新增第3個任務
    新增第4個任務
    新增第5個任務
    新增第6個任務
    thread name:main,執行:執行緒6
    thread name:pool- 1-thread-3,執行:執行緒3
    thread name:pool-1-thread-1,執行:執行緒0
    thread name:pool-1-thread-4,執行:執行緒4
    thread name:pool-1-thread-2,執行:執行緒1
    thread name:pool-1-thread-5,執行:執行緒5
    新增第7個任務
    新增第8個任務
    thread name:main,執行:執行緒8
    thread name:pool-1-thread-1,執行:執行緒7
    thread name:pool-1-thread-3,執行:執行緒2
    添加第9個任務
    thread name:pool-1-thread-1,執行:執行緒9

    從執行的結果我們可以得知,執行到第7個任務時,由於執行緒池拒絕策略,此任務由主執行緒執行,當執行緒池有空閒時,才繼續執行其他的任務。
    所以此策略可能會阻塞主執行緒。

    DiscardPolicy

    這種拒絕策略比較簡單,執行緒池拒絕的任務直接拋棄,不會拋棄異常也不會執行

    範例

    修改上述的程式碼,將拒絕策略修改為DiscardPolicy

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    2,
                    5,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(1),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    執行結果

    invoke dealStock success
    goodsId:手機

    thread name:pool-1- thread-1,執行:執行緒0
    thread name:pool-1-thread-4,執行:執行緒4
    thread name:pool-1-thread-5,執行:執行緒5
    thread name: pool-1-thread-3,執行:執行緒3
    thread name:pool-1-thread-2,執行:執行緒1
    thread name:pool-1-thread-1,執行:執行緒2

    從執行的結果來看只執行了6個任務,其他的任務都被拋棄了。

    DiscardOldestPolicy

    DiscardOldestPolicy 當任務拒絕新增時,會拋棄任務佇列中最先加入佇列的任務,再把新任務加入進去。

    範例說明

     ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    執行結果:

    新增第0個任務
    新增第1個任務

    新增第2個任務
    新增第3個任務
    新增第4個任務
    新增第5個任務
    invoke dealStock success
    goodsId:手機
    thread name:pool-1-thread-2,執行:執行緒3
    thread name:pool-1-thread-1,執行:執行緒0
    thread name:pool-1-thread-1,執行:執行緒2
    thread name:pool-1-thread- 2,執行:線程1

    自定义拒绝策略

    当线程池提供的拒绝策略无法满足要求时,我们可以采用自定义的拒绝策略,只需要实现RejectedExecutionHandler接口即可

    public class CustRejectedExecutionHandler implements RejectedExecutionHandler
    {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
        {
            new Thread(r,"线程:"+new Random().nextInt(10)).start();
        }
    }
    
      ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
                    1,
                    2,
                    10,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(2),
                    new CustRejectedExecutionHandler());

    执行结果:

    thread name:客户线程:6,执行:线程5
    thread name:pool-1-thread-1,执行:线程0
    thread name:客户线程:8,执行:线程4
    thread name:pool-1-thread-2,执行:线程3
    thread name:pool-1-thread-1,执行:线程1
    thread name:pool-1-thread-2,执行:线程2

    从执行的结果来看,被拒绝的任务都在客户的新线程中执行。

    小结

    • AbortPolicy:直接抛出异常,后续的任务不会执行

    • CallerRunsPolicy:子任务执行的时间过长,可能会阻塞主线程。

    • DiscardPolicy:不抛异常,任务直接丢弃

    • DiscardOldestPolicy;丢弃最先加入队列的任务

    以上是如何實作Java ThreadPoolExecutor的拒絕策略?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除