>  기사  >  Java  >  Spring Boot가 스레드 풀을 사용하여 수만 개의 데이터 삽입 기능을 처리하는 방법

Spring Boot가 스레드 풀을 사용하여 수만 개의 데이터 삽입 기능을 처리하는 방법

WBOY
WBOY앞으로
2023-05-12 22:22:041207검색

# 서문

이틀 전 프로젝트를 진행하면서 테이블 삽입 성능 최적화를 개선하고 싶었습니다. 테이블이 두 개이기 때문에 이전 테이블을 먼저 삽입한 다음 새 테이블을 추가하면 조금 느려질 것입니다.

나중에 스레드 풀 ThreadPoolExecutor를 생각했는데 Spring Boot 프로젝트를 사용하면 Spring에서 제공하는 스레드 풀 ThreadPoolTaskExecutor를 사용하여 ThreadPoolExecutor를 캡슐화하고 주석을 직접 사용하여 활성화할 수 있습니다

# 사용 단계

먼저 스레드 풀 구성을 생성하고 Spring Boot에서 이를 로드하여 ThreadPoolTaskExecutor를 생성하는 방법을 정의합니다. 두 개의 주석 @Configuration 및 @EnableAsync를 사용하여 이것이 스레드 풀에 대한 구성 클래스임을 나타냅니다.

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Value("${async.executor.thread.core_pool_size}")    
    private int corePoolSize;   
    
    @Value("${async.executor.thread.max_pool_size}")    
    private int maxPoolSize;   
    
    @Value("${async.executor.thread.queue_capacity}")  
    private int queueCapacity;   
    
    @Value("${async.executor.thread.name.prefix}")  
    private String namePrefix;
    @Bean(name = "asyncServiceExecutor")    
    public Executor asyncServiceExecutor() {   
        logger.info("start asyncServiceExecutor");    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    
        //配置核心线程数       
        executor.setCorePoolSize(corePoolSize);   
    
        //配置最大线程数      
        executor.setMaxPoolSize(maxPoolSize);   
    
        //配置队列大小     
        executor.setQueueCapacity(queueCapacity);    
    
        //配置线程池中的线程的名称前缀        
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务        
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  
         
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());     
        //执行初始化      
        executor.initialize();   
        return executor;  
     }
}

@Value는 제가 애플리케이션 .properties에 구성한 것입니다. 구성을 참조하여 자유롭게 정의할 수 있습니다.

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-

비동기 스레드의 인터페이스인 서비스 인터페이스를 생성합니다.

public interface AsyncService {   
    /**     
      * 执行异步任务     
      * 可以根据需求,自己加参数拟定,我这里就做个测试演示    
      */   
    void executeAsync();
}

구현 클래스

@Service
public class AsyncServiceImpl implements AsyncService {  
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    @Override 
    @Async("asyncServiceExecutor")    
    public void executeAsync() {    
        logger.info("start executeAsync");
        System.out.println("异步线程要做的事情");        
        System.out.println("可以在这里执行批量插入等耗时的事情");
        logger.info("end executeAsync");   
    }
}

주석 추가 @Async("asyncServiceExecutor")는 executorAsync() 메서드가 앞쪽에 있습니다. ExecutorConfig.java에 있는 메서드 이름은 ExecutorConfig.java에 있는 스레드 풀이 asyncServiceExecutor 메서드에 의해 생성되었음을 나타냅니다.

다음 단계는 다음과 같습니다. 컨트롤러 또는 어딘가에 @Autowired 주석을 통해 이 서비스를 주입합니다. startexecuteAsync

비동기 스레드가 해야 할 일

일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행할 수 있습니다

2022-07 -16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e.executor.impl .AsyncServiceImpl : end installAsync
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1 ] c.u.d.e.executor.impl.AsyncServiceImpl : start executorAsync

비동기 스레드가 해야 할 일
시간을 수행할 수 있습니다. -여기서 일괄 삽입 등을 소비합니다
2022-07-16 22:15:47.770 INFO 10516 --- [async-service- 1] c.u.d.e.executor.impl.AsyncServiceImpl : end excuteAsync
2022-07-16 22:15: 47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : start executorAsync
비동기 스레드가 해야 할 일
일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행할 수 있습니다
2022-07-16 22 :15:47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : end executorAsync
2022-07-16 22:15:48.833 INFO 10516 --- [async-service-3] c.u.d.e.executor .impl.AsyncServiceImpl : startexecuteAsync
비동기 스레드가 해야 할 일
일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행할 수 있습니다
2022-07- 16 22:15:48.834 INFO 10516 --- [async-service-3 ] c.u.d.e.executor.impl.AsyncServiceImpl : end installAsync
2022-07-16 22:15:48.986 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : start installAsync
비동기 스레드가 해야 할 일
일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행할 수 있습니다
2022-07-16 22:15:48.987 INFO 10516 --- [async-service-4 ] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync


From the 위의 로그에서 [async-service-]에는 여러 스레드가 있고 우리가 구성한 스레드 풀에서 분명히 실행되었음을 알 수 있으며 각 요청마다 컨트롤러의 시작 및 종료 로그가 연속적으로 인쇄되어 각 스레드가 요청에 대한 응답은 빠르게 이루어지며, 시간이 많이 걸리는 작업은 비동기 실행을 위해 스레드 풀에 있는 스레드에 맡깁니다.

우리는 스레드 풀을 사용했지만, 당시 스레드 풀의 상황은 아직 불분명합니다. 실행 중인 스레드 수와 대기열에서 대기 중인 스레드 수는 몇 개입니까? 여기에서는 스레드가 제출될 때마다 현재 스레드 풀의 실행 상태를 인쇄하는 ThreadPoolTaskExecutor의 하위 클래스를 만들었습니다.

@Autowiredprivate 
AsyncService asyncService;

@GetMapping("/async")
public void async(){  
    asyncService.executeAsync();
}
위에 표시된 것처럼 showThreadPoolInfo 메서드에는 총 작업 수, 완료된 작업 수, 활성 스레드 수, 큐 크기가 출력되고 상위 클래스의 실행, 제출 및 기타 메소드가 재정의되고 내부에서 showThreadPoolInfo 메소드가 호출되어 작업이 스레드에 제출될 때마다 pool을 실행하면 현재 Thread Pool의 기본 상황이 로그에 출력됩니다.

ExecutorConfig.java의 asyncServiceExecutor 메소드를 수정하고 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()를 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()로 변경합니다

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;
/** 
* @Author: 腾腾 
* @Date: 2022/7/16/0016 22:19 
*/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
    private void showThreadPoolInfo(String prefix) {        
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {    
            return;  
        }
        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",                
        this.getThreadNamePrefix(),         
        prefix,           
        threadPoolExecutor.getTaskCount(),     
        threadPoolExecutor.getCompletedTaskCount(),  
        threadPoolExecutor.getActiveCount(),    
        threadPoolExecutor.getQueue().size());
     }
    @Override    
    public void execute(Runnable task) {    
        showThreadPoolInfo("1. do execute");       
        super.execute(task);   
    }
    @Override    
    public void execute(Runnable task, long startTimeout) {      
        showThreadPoolInfo("2. do execute");   
        super.execute(task, startTimeout);  
    }
    @Override  
    public Future<?> submit(Runnable task) {   
        showThreadPoolInfo("1. do submit");    
        return super.submit(task);   
    }
    @Override  
    public <T> Future<T> submit(Callable<T> task) {   
        showThreadPoolInfo("2. do submit");      
        return super.submit(task); 
    }
    @Override    
    public ListenableFuture<?> submitListenable(Runnable task) {    
        showThreadPoolInfo("1. do submitListenable");   
        return super.submitListenable(task);   
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {     
        showThreadPoolInfo("2. do submitListenable");     
        return super.submitListenable(task);  
    }
}

프로젝트 테스트를 다시 시작합니다

2022-07-16 22:23:30.951 INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTaskExecutor: async-service-, 2. 제출하세요,taskCount [0],completeTaskCount[0], activeCount [0], queueSize [0]
2022-07-16 22:23:30.952 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
비동기 스레드가 수행해야 하는 작업
can 일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행됩니다
2022-07-16 22:23:30.953 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : end executorAsync
2022-07 -16 22:23 :31.351 INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. 제출, taskCount [1],completeTaskCount [1], activeCount [0], queueSize [0]
2022 -07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
비동기 스레드가 해야 할 일
시간이 많이 걸리는 작업을 수행할 수 있습니다. 일괄 삽입 같은 건 여기에
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : end excuteAsync
2022-07-16 22:23:31.927 INFO 14088 --- [nio- 8087-exec-5] u.d.e.e.i.VisiableThreadPoolTaskExecutor: async-service-, 2. 제출, taskCount [2],completeTaskCount [2], activeCount [0], queueSize [0]
2022-07- 16 22:23:31.929 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
비동기 스레드가 해야 할 일
일괄 삽입 등 시간이 많이 걸리는 작업은 여기서 수행할 수 있습니다
2022 -07-16 22:23:31.930 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : end executorAsync
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087 -exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async -service-, 2. 제출,taskCount [3], CompleteTaskCount [3], activeCount [0], queueSize [0]
2022-07-16 22:23:32.498 INFO 14088 --- [async-service-4 ] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
비동기 스레드가 해야 할 일
여기서 일괄 삽입 등 시간이 많이 걸리는 작업을 수행할 수 있습니다
2022-07-16 22:23: 32.499 INFO 14088 --- [async-service- 4] c.u.d.e.executor.impl.AsyncServiceImpl : end executorAsync

다음 로그 줄에 주의하세요:

2022-07-16 22:23:32.496 INFO 14088 -- - [nio-8087-exec-7] u.d.e.e.i.VisiableTh readPoolTaskExecutor: async -service-, 2. do submit,taskCount [3],completeTaskCount [3], activeCount [0], queueSize [0]

이것은 언제를 보여줍니다. 스레드 풀에 작업을 제출하는 submit(Callable task)을 Method라고 하며, 현재 3개의 작업이 제출되었고, 3개가 완료되었으며, 0개의 스레드가 현재 작업을 처리 중이고, 0개의 작업이 대기열에 남아 있는 기본 상황입니다. 스레드 풀은 완전히 깨끗합니다.

위 내용은 Spring Boot가 스레드 풀을 사용하여 수만 개의 데이터 삽입 기능을 처리하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제