ホームページ >Java >&#&チュートリアル >SpringBootスレッドプールとJavaスレッドプールの使い方

SpringBootスレッドプールとJavaスレッドプールの使い方

PHPz
PHPz転載
2023-05-18 12:46:361164ブラウズ

    SpringBoot スレッド プールと Java スレッド プールの使用法と実装の原則

    デフォルトのスレッド プールを使用する

    方法 1: @Async を使用するアノテーション呼び出し

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
    }

    は、@EnableAsync アノテーションを使用してスタートアップ クラスに追加する必要があります。追加しないと有効になりません。

    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }

    方法 2: ThreadPoolTask​​Executor を直接挿入する

    追加する必要はありません @EnableAsyncAnnotation

    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }

    スレッド プールのデフォルト構成情報

    SpringBoot スレッド プールの一般的な構成:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀

    SpringBoot スレッド プール

    TaskExecutionAutoConfiguration の実装原則は、内部実装であるクラス ThreadPoolTask​​Executor で定義されています。このクラスのこれも、Java のネイティブ ThreadPoolExecutor クラスに基づいています。 initializeExecutor()メソッドは親クラスで呼び出されますが、親クラスでは RejectedExecutionHandlerprivate RejectedExecutionHandler requestedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); として定義されています。そして、initialize() メソッドを通じて AbortPolicyinitializeExecutor() に渡します。

    TaskExecutionAutoConfiguration クラスでは、ThreadPoolTask​​Executor クラスの Bean 名は、applicationTaskExecutor および taskExecutor であることに注意してください。

    // TaskExecutionAutoConfiguration#applicationTaskExecutor()
    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
          AsyncAnnotationBeanPostProcessor.DEFAUL
              T_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
       return builder.build();
    }
    // ThreadPoolTaskExecutor#initializeExecutor()
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    
       ThreadPoolExecutor executor;
       if (this.taskDecorator != null) {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
             @Override
             public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                   decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
             }
          };
       }
       else {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
    
       }
    
       if (this.allowCoreThreadTimeOut) {
          executor.allowCoreThreadTimeOut(true);
       }
    
       this.threadPoolExecutor = executor;
       return executor;
    }
    // ExecutorConfigurationSupport#initialize()
    public void initialize() {
       if (logger.isInfoEnabled()) {
          logger.info("Initializing ExecutorService" + (this.beanName != null ? " &#39;" + this.beanName + "&#39;" : ""));
       }
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    デフォルトのスレッド プールをオーバーライドします

    デフォルトの taskExecutor オブジェクトをオーバーライドします。Bean の戻り値の型は、ThreadPoolTask​​Executor または Executor です。

    @Configuration
    public class ThreadPoolConfiguration {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //设置线程池参数信息
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(50);
            taskExecutor.setQueueCapacity(200);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("myExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            //修改拒绝策略为使用当前线程执行
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //初始化线程池
            taskExecutor.initialize();
            return taskExecutor;
        }
    }

    複数のスレッド プールの管理

    複数のスレッド プールがある場合、たとえば、別のスレッド プール taskExecutor2 を定義すると、直接実行するとエラーが報告されます。このとき、Bean の名前を指定する必要があります。

    @Bean("taskExecutor2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    スレッド プールを参照する場合、名前で検索されるように、変数名を Bean の名前に変更する必要があります。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;

    @Async アノテーションを使用したマルチスレッドの場合は、アノテーションで Bean 名を指定するだけです。

    @Async("taskExecutor2")
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }

    スレッド プールの 4 つの拒否戦略

    JAVA で一般的に使用される 4 つのスレッド プール

    ThreadPoolExecutor クラスのコンストラクターは次のとおりです。

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    newCachedThreadPool

    スレッドの最大数を制限しません (maximumPoolSize=Integer.MAX_VALUE)。アイドル状態のスレッドが必要以上にある場合はリサイクルされ、そうでない場合は既存のスレッドになります。スレッドは再利用されます。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

    newFixedThreadPool

    固定長のスレッド プール。スレッド数を超えるタスクはキューで待機します。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

    newScheduledThreadPool

    newCachedThreadPool と同様に、スレッド数に上限はありませんが、corePoolSize を指定できます。遅延実行と定期実行が可能です。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    定期実行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);

    遅延実行:

    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);

    newSingleThreadExecutor

    シングルスレッド スレッド プールは、スレッドの順次実行を実現できます。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    Java スレッド プールの 4 つの拒否ポリシー

    • CallerRunsPolicy: スレッド プールは呼び出し元の実行を許可します。

    • AbortPolicy: スレッド プールがタスクを拒否した場合、エラーが直接報告されます。

    • DiscardPolicy: スレッド プールがタスクを拒否した場合、そのタスクは直接破棄されます。

    • DiscardOldestPolicy: スレッド プールがタスクを拒否した場合、スレッド プール内の最も古い未実行タスクが直接破棄され、新しいタスクがキューに入れられます。

    CallerRunsPolicy

    run メソッドはメイン スレッドで直接実行されます。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    効果は次のようになります。

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();

    AbortPolicy

    RejectedExecutionException 例外を直接スローし、タスク情報とスレッド プール情報を示します。 ,

    public static class AbortPolicy implements RejectedExecutionHandler {
     
        public AbortPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy

    は何も行いません。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    DiscardOldestPolicy

    • ##e.getQueue().poll(): キューから最も古いタスクを削除します。

    • e.execute(r) : 現在のタスクがキューに入れられます。

    • public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       
          public DiscardOldestPolicy() { }
       
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              if (!e.isShutdown()) {
                  e.getQueue().poll();
                  e.execute(r);
              }
          }
      }
    Java スレッド再利用の原則

    javaスレッド プールは java.util.concurrent.ThreadPoolExecutor.Worker# に保存されます## オブジェクト。private Final HashSet7b8f3a9cc722774601ac1c36ccacd929 works = new HashSet7b8f3a9cc722774601ac1c36ccacd929(); で維持されます。 workQueue は実行するタスクを格納するキューで、スレッドプールに新しいタスクが追加されると、そのタスクは workQueue キューに追加されます。 <pre class="brush:java;">private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() &gt;= 0 &amp;&amp; (t = thread) != null &amp;&amp; !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }</pre>作業オブジェクトの実行は、

    runWorker()

    に依存します。これは、通常作成するスレッドとは異なります。このスレッドはループ内にあり、キューから新しいタスクの実行を継続的に取得します。 . .したがって、通常使用するスレッドのように実行後に終了するのではなく、スレッド プール内のスレッドを再利用できます。 rree

    以上がSpringBootスレッドプールとJavaスレッドプールの使い方の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。