>Java >java지도 시간 >SpringBoot 스레드 풀과 Java 스레드 풀을 사용하는 방법

SpringBoot 스레드 풀과 Java 스레드 풀을 사용하는 방법

PHPz
PHPz앞으로
2023-05-18 12:46:361174검색

    SpringBoot 스레드 풀과 Java 스레드 풀의 사용 및 구현 원칙

    기본 스레드 풀 사용

    방법 1: @Async 주석을 통해 호출

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
    }

    @EnableAsync를 추가해야 합니다. 시작 클래스 >Annotation, 그렇지 않으면 적용되지 않습니다. @EnableAsync注解,否则不会生效。

    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }

    方式二:直接注入 ThreadPoolTaskExecutor

    此时可不加 @EnableAsync注解

    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }

    线程池默认配置信息

    SpringBoot线程池的常见配置:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀

    SpringBoot 线程池的实现原理

    TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

    注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor

    // TaskExecutionAutoConfiguration#applicationTaskExecutor()
    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
          AsyncAnnotationBeanPostProcessor.DEFAUL
              T_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
       return builder.build();
    }
    // ThreadPoolTaskExecutor#initializeExecutor()
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    
       ThreadPoolExecutor executor;
       if (this.taskDecorator != null) {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
             @Override
             public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                   decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
             }
          };
       }
       else {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
    
       }
    
       if (this.allowCoreThreadTimeOut) {
          executor.allowCoreThreadTimeOut(true);
       }
    
       this.threadPoolExecutor = executor;
       return executor;
    }
    // ExecutorConfigurationSupport#initialize()
    public void initialize() {
       if (logger.isInfoEnabled()) {
          logger.info("Initializing ExecutorService" + (this.beanName != null ? " &#39;" + this.beanName + "&#39;" : ""));
       }
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    覆盖默认的线程池

    覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

    @Configuration
    public class ThreadPoolConfiguration {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //设置线程池参数信息
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(50);
            taskExecutor.setQueueCapacity(200);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("myExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            //修改拒绝策略为使用当前线程执行
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //初始化线程池
            taskExecutor.initialize();
            return taskExecutor;
        }
    }

    管理多个线程池

    如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

    @Bean("taskExecutor2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;

    对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }

    线程池的四种拒绝策略

    JAVA常用的四种线程池

    ThreadPoolExecutor 类的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    newCachedThreadPool

    不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

    newFixedThreadPool

    定长线程池,超出线程数的任务会在队列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

    newScheduledThreadPool

    类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    周期执行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);

    延时执行:

    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);

    newSingleThreadExecutor

    单线程线程池,可以实现线程的顺序执行。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    Java 线程池中的四种拒绝策略

    • CallerRunsPolicy:线程池让调用者去执行。

    • AbortPolicy:如果线程池拒绝了任务,直接报错。

    • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

    • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

    CallerRunsPolicy

    直接在主线程中执行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    效果类似于:

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();

    AbortPolicy

    直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

    public static class AbortPolicy implements RejectedExecutionHandler {
     
        public AbortPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy

    什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    DiscardOldestPolicy

    • e.getQueue().poll() : 取出队列最旧的任务。

    • e.execute(r) : 当前任务入队。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
        public DiscardOldestPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

    Java 线程复用的原理

    java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet7b8f3a9cc722774601ac1c36ccacd929 workers = new HashSet7b8f3a9cc722774601ac1c36ccacd929();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    work对象的执行依赖于 runWorker()

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    방법 2: ThreadPoolTaskExecutor 직접 삽입🎜🎜지금은 @EnableAsync 주석을 추가할 필요가 없습니다🎜rrreee🎜스레드 풀 기본 구성 정보🎜🎜SpringBoot 스레드 풀의 일반적인 구성:🎜 rrreee🎜SpringBoot 스레드 풀 원리 구현 🎜🎜TaskExecutionAutoConfiguration 클래스는 ThreadPoolTaskExecutor를 정의하고 이 클래스의 내부 구현도 Java의 기본 ThreadPoolExecutor를 기반으로 합니다. 수업. initializeExecutor() 메서드는 상위 클래스에서 호출되지만 상위 클래스에서는 RejectedExecutionHandlerprivate RejectedExecutionHandler 거부된ExecutionHandler = new ThreadPoolExecutor.AbortPolicy();, <code>initialize() 메서드를 통해 AbortPolicyinitializeExecutor()에 전달합니다. 🎜🎜TaskExecutionAutoConfiguration 클래스에서 ThreadPoolTaskExecutor 클래스의 Bean 이름은 applicationTaskExecutortaskExecutor입니다. 🎜rrreeerrreeerrreee🎜기본 스레드 풀 재정의🎜🎜기본 taskExecutor 개체를 재정의하세요. Bean의 반환 유형은 ThreadPoolTaskExecutor 또는 Executor일 수 있습니다. 🎜rrreee🎜여러 스레드 풀 관리🎜🎜여러 스레드 풀이 있는 경우(예: 다른 스레드 풀 taskExecutor2 정의) 직접 실행하면 오류가 보고됩니다. 이때 Bean의 이름을 지정해 주어야 합니다. 🎜rrreee🎜스레드풀 참조시 변수명을 해당 빈의 이름으로 변경해야 이름으로 검색이 됩니다. 🎜rrreee🎜@Async 주석을 사용하는 멀티스레딩의 경우 주석에 Bean 이름을 지정하기만 하면 됩니다. 🎜rrreee🎜스레드 풀의 네 가지 거부 전략🎜🎜JAVA에서 일반적으로 사용되는 네 가지 스레드 풀🎜🎜ThreadPoolExecutor 클래스 생성자는 다음과 같습니다. 🎜rrreee🎜newCachedThreadPool🎜🎜최대 개수에는 제한이 없습니다. 스레드( maximumPoolSize=Integer.MAX_VALUE), 필요한 것보다 더 많은 유휴 스레드가 있으면 재활용되고, 그렇지 않으면 기존 스레드가 재사용됩니다. 🎜rrreee🎜newFixedThreadPool🎜🎜고정 길이 스레드 풀, 스레드 수를 초과하는 작업은 대기열에서 대기합니다. 🎜rrreee🎜newScheduledThreadPool🎜🎜 newCachedThreadPool과 유사하며 스레드 수에는 상한이 없지만 corePoolSize를 지정할 수 있습니다. 지연 실행과 주기적 실행이 가능합니다. 🎜rrreee🎜정기적 실행: 🎜rrreee🎜지연 실행: 🎜rrreee🎜newSingleThreadExecutor🎜🎜스레드의 순차적 실행을 실현할 수 있는 단일 스레드 스레드 풀입니다. 🎜rrreee🎜Java 스레드 풀의 네 가지 거부 전략🎜
    • 🎜CallerRunsPolicy: 스레드 풀을 통해 호출자가 실행될 수 있습니다. 🎜
    • 🎜AbortPolicy: 스레드 풀이 작업을 거부하면 오류가 직접 보고됩니다. 🎜
    • 🎜DiscardPolicy: 스레드 풀이 작업을 거부하면 해당 작업이 바로 삭제됩니다. 🎜
    • 🎜DiscardOldestPolicy: 스레드 풀이 작업을 거부하면 스레드 풀에서 가장 오래되고 실행되지 않은 작업이 바로 삭제되고 새 작업이 대기열에 추가됩니다. 🎜
    • 🎜🎜CallerRunsPolicy🎜🎜는 메인 스레드에서 직접 run 메소드를 실행합니다. 🎜rrreee🎜효과는 다음과 유사합니다. 🎜rrreee🎜AbortPolicy🎜🎜는 RejectedExecutionException 예외를 직접 발생시키고 작업 정보와 스레드 풀 정보를 나타냅니다. , 🎜rrreee🎜DiscardPolicy🎜🎜는 아무 작업도 수행하지 않습니다. 🎜rrreee🎜DiscardOldestPolicy🎜
      • 🎜e.getQueue().poll() : 가장 오래된 정책을 제거합니다. 큐 태스크. 🎜
      • 🎜e.execute(r) : 현재 작업이 대기열에 추가됩니다. 🎜
      • 🎜rrreee🎜Java 스레드 재사용의 원칙🎜🎜 java의 스레드 풀은 java.util.concurrent.ThreadPoolExecutor.Worker 객체를 저장합니다. private final HashSet<worker> Workers = new HashSet<worker>();</worker></worker>에서 유지됩니다. workQueue는 실행할 작업을 저장하는 대기열입니다. 스레드 풀에 새 작업이 추가되면 해당 작업이 workQueue 대기열에 추가됩니다. 🎜rrreee🎜작업 개체의 실행은 runWorker()에 의존합니다. 우리가 일반적으로 작성하는 스레드와 달리 이 스레드는 루프에 있으며 실행을 위해 대기열에서 지속적으로 새 작업을 얻습니다. 따라서 우리가 일반적으로 사용하는 스레드처럼 실행 후 종료되는 대신 스레드 풀에 있는 스레드를 재사용할 수 있습니다. 🎜아아아아

    위 내용은 SpringBoot 스레드 풀과 Java 스레드 풀을 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    성명:
    이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제