Maison  >  Article  >  Java  >  Comment Spring Boot utilise le pool de threads pour gérer des dizaines de milliers de fonctions d'insertion de données

Comment Spring Boot utilise le pool de threads pour gérer des dizaines de milliers de fonctions d'insertion de données

WBOY
WBOYavant
2023-05-12 22:22:041207parcourir

# Préface

Lorsque je travaillais sur un projet il y a deux jours, je souhaitais améliorer l'optimisation des performances d'insertion de tables puisqu'il y a deux tables, insérer d'abord l'ancienne table puis la nouvelle table serait un peu lent pour en savoir plus. plus de 10 000 éléments de données.

J'ai pensé au pool de threads ThreadPoolExecutor plus tard, et en utilisant le projet Spring Boot, vous pouvez utiliser le pool de threads ThreadPoolTaskExecutor fourni par Spring pour encapsuler le ThreadPoolExecutor et utiliser directement des annotations pour l'activer

# Étapes d'utilisation

Créez d'abord une configuration de pool de threads, laissez Spring Boot le charger pour définir comment créer un ThreadPoolTaskExecutor. Utilisez les deux annotations @Configuration et @EnableAsync pour indiquer qu'il s'agit d'une classe de configuration et d'une classe de configuration pour le pool de threads.

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Value("${async.executor.thread.core_pool_size}")    
    private int corePoolSize;   
    
    @Value("${async.executor.thread.max_pool_size}")    
    private int maxPoolSize;   
    
    @Value("${async.executor.thread.queue_capacity}")  
    private int queueCapacity;   
    
    @Value("${async.executor.thread.name.prefix}")  
    private String namePrefix;
    @Bean(name = "asyncServiceExecutor")    
    public Executor asyncServiceExecutor() {   
        logger.info("start asyncServiceExecutor");    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    
        //配置核心线程数       
        executor.setCorePoolSize(corePoolSize);   
    
        //配置最大线程数      
        executor.setMaxPoolSize(maxPoolSize);   
    
        //配置队列大小     
        executor.setQueueCapacity(queueCapacity);    
    
        //配置线程池中的线程的名称前缀        
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务        
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  
         
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());     
        //执行初始化      
        executor.initialize();   
        return executor;  
     }
}

@Value est ce que j'ai configuré dans l'application .properties, vous pouvez vous référer à la configuration et définir librement

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-

Créer une interface de service, qui est l'interface des threads asynchrones

public interface AsyncService {   
    /**     
      * 执行异步任务     
      * 可以根据需求,自己加参数拟定,我这里就做个测试演示    
      */   
    void executeAsync();
}

classe d'implémentation

@Service
public class AsyncServiceImpl implements AsyncService {  
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    @Override 
    @Async("asyncServiceExecutor")    
    public void executeAsync() {    
        logger.info("start executeAsync");
        System.out.println("异步线程要做的事情");        
        System.out.println("可以在这里执行批量插入等耗时的事情");
        logger.info("end executeAsync");   
    }
}

Ajouter l'annotation @Async("asyncServiceExecutor") à la méthode executeAsync(). La méthode asyncServiceExecutor est au premier plan. Le nom de la méthode dans ExecutorConfig.java indique que le pool de threads entré par la méthode executeAsync est créé par la méthode asyncServiceExecutor

L'étape suivante consiste à injectez le Service via l'annotation @Autowired dans le Contrôleur ou quelque part

@Autowiredprivate 
AsyncService asyncService;

@GetMapping("/async")
public void async(){  
    asyncService.executeAsync();
}

Impression du journal

2022-07- 16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07 -16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e.executor.impl .AsyncServiceImpl : fin d'exécuterAsync
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1 ] c.u.d.e.executor.impl.AsyncServiceImpl : démarrer exécuterAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer du temps -consommer des choses telles que l'insertion par lots ici
2022-07-16 22:15:47.770 INFO 10516 --- [async-service- 1] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:15 : 47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07-16 22 :15:47.816 INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : fin executeAsync
2022-07-16 22:15:48.833 INFO 10516 --- [async-service-3] c.u.d.e.executor .impl.AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07- 16 22:15:48.834 INFO 10516 --- [async-service-3 ] c.u.d.e.executor.impl.AsyncServiceImpl : fin d'exécuterAsync
2022-07-16 22:15:48.986 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : démarrer exécuterAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07-16 22:15:48.987 INFO 10516 --- [async-service-4 ] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync

Cela peut On trouve dans le journal ci-dessus que [async-service-] a plusieurs threads, et il a évidemment été exécuté dans le pool de threads que nous avons configuré, et dans chaque requête, les journaux de début et de fin du contrôleur sont imprimés en continu, indiquant que chaque la demande reçoit une réponse rapide et les opérations fastidieuses sont laissées aux threads du pool de threads pour une exécution asynchrone ;

Bien que nous ayons utilisé le pool de threads, la situation du pool de threads à ce moment-là n'est toujours pas claire ; Combien de threads étaient en cours d’exécution et combien attendaient dans la file d’attente ? Ici, j'ai créé une sous-classe de ThreadPoolTaskExecutor, qui imprimera l'état d'exécution du pool de threads actuel chaque fois qu'un thread est soumis

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;
/** 
* @Author: 腾腾 
* @Date: 2022/7/16/0016 22:19 
*/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
    private void showThreadPoolInfo(String prefix) {        
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {    
            return;  
        }
        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",                
        this.getThreadNamePrefix(),         
        prefix,           
        threadPoolExecutor.getTaskCount(),     
        threadPoolExecutor.getCompletedTaskCount(),  
        threadPoolExecutor.getActiveCount(),    
        threadPoolExecutor.getQueue().size());
     }
    @Override    
    public void execute(Runnable task) {    
        showThreadPoolInfo("1. do execute");       
        super.execute(task);   
    }
    @Override    
    public void execute(Runnable task, long startTimeout) {      
        showThreadPoolInfo("2. do execute");   
        super.execute(task, startTimeout);  
    }
    @Override  
    public Future<?> submit(Runnable task) {   
        showThreadPoolInfo("1. do submit");    
        return super.submit(task);   
    }
    @Override  
    public <T> Future<T> submit(Callable<T> task) {   
        showThreadPoolInfo("2. do submit");      
        return super.submit(task); 
    }
    @Override    
    public ListenableFuture<?> submitListenable(Runnable task) {    
        showThreadPoolInfo("1. do submitListenable");   
        return super.submitListenable(task);   
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {     
        showThreadPoolInfo("2. do submitListenable");     
        return super.submitListenable(task);  
    }
}

Comme indiqué ci-dessus, dans la méthode showThreadPoolInfo, le nombre total de tâches, le nombre de tâches terminées, le nombre de threads actifs et la file d'attente. La taille est imprimée, puis les méthodes d'exécution, de soumission et autres de la classe parent sont remplacées, et la méthode showThreadPoolInfo est appelée à l'intérieur, de sorte qu'à chaque fois qu'une tâche est soumise au thread pool, la situation de base du pool de threads actuel sera imprimée dans le journal.

Modifiez la méthode asyncServiceExecutor d'ExecutorConfig.java et remplacez ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() par ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()

@Bean(name = "asyncServiceExecutor")    
public Executor asyncServiceExecutor() {  
    logger.info("start asyncServiceExecutor");  
    //在这里修改       
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();    
    //配置核心线程数     
    executor.setCorePoolSize(corePoolSize);  
    //配置最大线程数      
    executor.setMaxPoolSize(maxPoolSize);  
    //配置队列大小      
    executor.setQueueCapacity(queueCapacity); 
    //配置线程池中的线程的名称前缀      
    executor.setThreadNamePrefix(namePrefix);
    
    // rejection-policy:当pool已经达到max size的时候,如何处理新任务      
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行     
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
    //执行初始化       
    executor.initialize();       
    return executor; 
}

Recommencez le test du projet

2022-07-16 22:23:30.951 INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. soumettre,taskCount [0], completeTaskCount [0], activeCount [0], queueSize [0]
2022-07-16 22:23:30.952 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : démarrer exécuterAsync# 🎜🎜#Choses à faire par les threads asynchrones
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07-16 22:23:30.953 INFO 14088 --- [async-service- 1] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync
2022-07-16 22:23:31.351 INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit, taskCount [1],completeTaskCount [1], activeCount [0], queueSize [0]
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor. impl. AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07-16 22:23:31.353 INFO 14088 - -- [async -Service-2] C.U.D.E.EXECUTOR.IMPL.ASYNCSERVICEIMPL : END Executeasync
2022-07-16 22 : 23 : 31.927 Info 14088 --- [NiO-8087-EXEC-5] .siablethreadPooltaskexecutor : Async -Service-, 2. soumettre, taskCount [2],completeTaskCount [2], activeCount [0], queueSize [0]
2022-07-16 22:23:31.929 INFO 14088 --- [async- service-3] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer des tâches fastidieuses telles que l'insertion par lots ici
2022-07-16 22 :23:31.930 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : fin executeAsync
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec -7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3],completeTaskCount [3], activeCount [0], queueSize [0]
2022-07-16 22:23:32.498 INFO 14088 --- [async -service-4] c.u.d.e.executor.impl.AsyncServiceImpl : start executeAsync
Ce que le thread asynchrone doit faire
Vous pouvez effectuer une insertion par lots et d'autres choses fastidieuses ici#🎜🎜 #2022-07-16 22 : 23:32.499 INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : end executeAsync

Faites attention à cette ligne de log :

# 🎜🎜#2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor: async-service-, 2. soumettre ,taskCount [3],completeTaskCount [3], activeCount [0], queueSize [0]

Cela montre que lors de la soumission d'une tâche au pool de threads, la soumission (tâche appelable) La méthode est appelée. Actuellement, 3 tâches ont été soumises. 3 sont terminées, il y a actuellement 0 tâche de traitement de thread et 0 tâche reste en attente dans la file d'attente. La situation de base du pool de threads est claire.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer