ホームページ  >  記事  >  Java  >  Spring Boot がスレッド プールを使用して数万のデータ挿入関数を処理する方法

Spring Boot がスレッド プールを使用して数万のデータ挿入関数を処理する方法

WBOY
WBOY転載
2023-05-12 22:22:041207ブラウズ

# はじめに

#2 日前にプロジェクトに取り組んでいたとき、テーブル挿入のパフォーマンスの最適化を改善したいと思いました。テーブルが 2 つあるため、古いテーブルを最初に挿入し、次に古いテーブルを挿入する必要があります。新しいテーブル、コストが 10,000 以上 データが少し遅いです。

スレッド プール ThreadPoolExecutor は後で考えましたが、Spring Boot プロジェクトを使用すると、Spring が提供するスレッド プール ThreadPoolTask​​Executor を使用できます。 ThreadPoolExecutor をカプセル化し、アノテーションを直接使用して有効にします

# 使用手順

まずスレッド プール構成を作成し、Spring Boot にそれをロードさせて ThreadPoolTask​​Executor の作成方法を定義します2 つの注釈 @Configuration と @EnableAsync を使用して、これが構成クラスであり、スレッド プールの構成クラスであることを示します

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Value("${async.executor.thread.core_pool_size}")    
    private int corePoolSize;   
    
    @Value("${async.executor.thread.max_pool_size}")    
    private int maxPoolSize;   
    
    @Value("${async.executor.thread.queue_capacity}")  
    private int queueCapacity;   
    
    @Value("${async.executor.thread.name.prefix}")  
    private String namePrefix;
    @Bean(name = "asyncServiceExecutor")    
    public Executor asyncServiceExecutor() {   
        logger.info("start asyncServiceExecutor");    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    
        //配置核心线程数       
        executor.setCorePoolSize(corePoolSize);   
    
        //配置最大线程数      
        executor.setMaxPoolSize(maxPoolSize);   
    
        //配置队列大小     
        executor.setQueueCapacity(queueCapacity);    
    
        //配置线程池中的线程的名称前缀        
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务        
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  
         
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());     
        //执行初始化      
        executor.initialize();   
        return executor;  
     }
}

@Value は application.properties で私によって構成されています。

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-

非同期スレッドであるServiceインターフェースを作成します。インターフェース

public interface AsyncService {   
    /**     
      * 执行异步任务     
      * 可以根据需求,自己加参数拟定,我这里就做个测试演示    
      */   
    void executeAsync();
}

実装クラス

@Service
public class AsyncServiceImpl implements AsyncService {  
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    @Override 
    @Async("asyncServiceExecutor")    
    public void executeAsync() {    
        logger.info("start executeAsync");
        System.out.println("异步线程要做的事情");        
        System.out.println("可以在这里执行批量插入等耗时的事情");
        logger.info("end executeAsync");   
    }
}

アノテーション @Async("asyncServiceExecutor") を追加します。 asyncServiceExecutor メソッドは、前の ExecutorConfig.java のメソッド名であり、executeAsync メソッドが開始されることを示しています スレッド プールは asyncServiceExecutor メソッドによって作成されます

次のステップは、次のステップでサービスを注入することです。コントローラーまたはどこかの注釈 @Autowired

@Autowiredprivate 
AsyncService asyncService;

@GetMapping("/async")
public void async(){  
    asyncService.executeAsync();
}

ログ印刷

2022- 07-16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e. executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドによって実行されること
ここでバッチ挿入やその他の時間のかかるタスクを実行できます Things
2022-07-16 22:15:47.655 INFO 10516 - -- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドで実行すること
バッチ挿入などの時間のかかる処理はここで実行できます
2022-07-16 22:15:47.770 INFO 10516 --- [async-service- 1] c.u.d.e.executor.impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:15:47.816 INFO 10516 --- [async-service- 2] c.u.d.e.executor.impl.AsyncServiceImpl : 開始executeAsync
非同期スレッドが行う必要がある
バッチ挿入などの時間のかかる作業はここで実行できます
2022-07-16 22:15:47.816 INFO 10516 - -- [async-service-2] c.u.d.e.executor.impl。 AsyncServiceImpl : endexecuteAsync
2022-07-16 22:15:48.833 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドによって実行されること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:15:48.834 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
2022-07-16 22:15:48.986 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
物事を実行する非同期スレッド
次のような時間のかかることを実行できます。ここにバッチ挿入として
2022-07-16 22:15:48.987 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync

次のようにすることができます上記のログから、[async-service-] には複数のスレッドがあり、明らかに構成したスレッド プールで実行され、各リクエストでコントローラーの開始ログと終了ログが連続的に出力され、各リクエストが迅速に応答されたことがわかります。 、時間のかかる操作は非同期実行のためにスレッド プール内のスレッドに任せられます;

スレッド プールを使用しましたが、当時のスレッド プールの状況はまだ不明です。多くのスレッドが実行されており、キュー内で待機しているスレッドはいくつありますか?ここでは、ThreadPoolTask​​Executor のサブクラスを作成しました。これは、スレッドが送信されるたびに、現在のスレッド プールの実行ステータスを出力します。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;
/** 
* @Author: 腾腾 
* @Date: 2022/7/16/0016 22:19 
*/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
    private void showThreadPoolInfo(String prefix) {        
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {    
            return;  
        }
        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",                
        this.getThreadNamePrefix(),         
        prefix,           
        threadPoolExecutor.getTaskCount(),     
        threadPoolExecutor.getCompletedTaskCount(),  
        threadPoolExecutor.getActiveCount(),    
        threadPoolExecutor.getQueue().size());
     }
    @Override    
    public void execute(Runnable task) {    
        showThreadPoolInfo("1. do execute");       
        super.execute(task);   
    }
    @Override    
    public void execute(Runnable task, long startTimeout) {      
        showThreadPoolInfo("2. do execute");   
        super.execute(task, startTimeout);  
    }
    @Override  
    public Future<?> submit(Runnable task) {   
        showThreadPoolInfo("1. do submit");    
        return super.submit(task);   
    }
    @Override  
    public <T> Future<T> submit(Callable<T> task) {   
        showThreadPoolInfo("2. do submit");      
        return super.submit(task); 
    }
    @Override    
    public ListenableFuture<?> submitListenable(Runnable task) {    
        showThreadPoolInfo("1. do submitListenable");   
        return super.submitListenable(task);   
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {     
        showThreadPoolInfo("2. do submitListenable");     
        return super.submitListenable(task);  
    }
}

上で示したように、showThreadPoolInfo メソッドは、タスクの合計数を出力します。完了したタスクの数、スレッドの数とキューのサイズが出力され、親クラスの実行メソッド、送信メソッド、およびその他のメソッドがオーバーライドされ、内部で showThreadPoolInfo メソッドが呼び出され、タスクが送信されるたびに showThreadPoolInfo メソッドが呼び出されます。スレッド プール、現在のスレッド プールの基本的な状況がログに出力されます。

ExecutorConfig.java の asyncServiceExecutor メソッドを変更し、ThreadPoolTask​​Executor executor = new ThreadPoolTask​​Executor() を ThreadPoolTask​​Executor executor = new VisiableThreadPoolTask​​Executor()に変更します

@Bean(name = "asyncServiceExecutor")    
public Executor asyncServiceExecutor() {  
    logger.info("start asyncServiceExecutor");  
    //在这里修改       
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();    
    //配置核心线程数     
    executor.setCorePoolSize(corePoolSize);  
    //配置最大线程数      
    executor.setMaxPoolSize(maxPoolSize);  
    //配置队列大小      
    executor.setQueueCapacity(queueCapacity); 
    //配置线程池中的线程的名称前缀      
    executor.setThreadNamePrefix(namePrefix);
    
    // rejection-policy:当pool已经达到max size的时候,如何处理新任务      
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行     
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 
    //执行初始化       
    executor.initialize();       
    return executor; 
}

プロジェクトのテストを再度開始します

2022-07-16 22:23:30.951 INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTask​​Executor: async-service-, 2. do submit,taskCount [0], completedTaskCount [ 0]、activeCount [0]、queueSize [0]
2022-07-16 22:23:30.952 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : 開始executeAsync
非同期スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:23:30.953 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl .AsyncServiceImpl : endexecuteAsync
2022-07-16 22:23:31.351 INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTask​​Executor : async-service-, 2. do submit,taskCount [1], completedTaskCount [1]、activeCount [0]、queueSize [0]
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
asynchronous スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクをここで実行できます
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor .impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:23:31.927 INFO 14088 --- [nio-8087-exec-5] u.d.e.e.i.VisiableThreadPoolTask​​Executor : async-service-, 2. do submit,taskCount [2 ]、 completedTaskCount [ 2]、 activeCount [0]、 queueSize [0]
2022-07-16 22:23:31.929 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl :executeAsync を開始します
非同期 スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクをここで実行できます
2022-07-16 22:23:31.930 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTask​​Executor : async-service-, 2. do submit,taskCount [3]、completedTaskCount [3]、activeCount [0]、queueSize [0]
2022-07-16 22:23:32.498 INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
asynchronous スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:23:32.499 INFO 14088 --- [async-service- 4] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync

ログの次の行に注目してください:

2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTask​​Executor: async-service-、2. do submit,taskCount [3]、completedTaskCount [3]、activeCount [0]、queueSize [0]

これは、スレッド プールにタスクを送信するときに、 submit(Callable task) メソッドが呼び出されることを示しています。現在、3 つのタスクが送信され、3 つは完了しています。現在、タスクを処理しているスレッドは 0 つ、キュー内で待機しているタスクは 0 つあります。 . スレッド プールの基本的な状況は明らかです。

以上がSpring Boot がスレッド プールを使用して数万のデータ挿入関数を処理する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。