ホームページ >Java >&#&チュートリアル >Spring Boot がスレッド プールを使用して数万のデータ挿入関数を処理する方法
#2 日前にプロジェクトに取り組んでいたとき、テーブル挿入のパフォーマンスの最適化を改善したいと思いました。テーブルが 2 つあるため、古いテーブルを最初に挿入し、次に古いテーブルを挿入する必要があります。新しいテーブル、コストが 10,000 以上 データが少し遅いです。
スレッド プール ThreadPoolExecutor は後で考えましたが、Spring Boot プロジェクトを使用すると、Spring が提供するスレッド プール ThreadPoolTaskExecutor を使用できます。 ThreadPoolExecutor をカプセル化し、アノテーションを直接使用して有効にします
まずスレッド プール構成を作成し、Spring Boot にそれをロードさせて ThreadPoolTaskExecutor の作成方法を定義します2 つの注釈 @Configuration と @EnableAsync を使用して、これが構成クラスであり、スレッド プールの構成クラスであることを示します
@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
@Value は application.properties で私によって構成されています。
# 异步线程配置 # 配置核心线程数 async.executor.thread.core_pool_size = 5 # 配置最大线程数 async.executor.thread.max_pool_size = 5 # 配置队列大小 async.executor.thread.queue_capacity = 99999 # 配置线程池中的线程的名称前缀 async.executor.thread.name.prefix = async-service-
非同期スレッドであるServiceインターフェースを作成します。インターフェース
public interface AsyncService { /** * 执行异步任务 * 可以根据需求,自己加参数拟定,我这里就做个测试演示 */ void executeAsync(); }
実装クラス
@Service public class AsyncServiceImpl implements AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Override @Async("asyncServiceExecutor") public void executeAsync() { logger.info("start executeAsync"); System.out.println("异步线程要做的事情"); System.out.println("可以在这里执行批量插入等耗时的事情"); logger.info("end executeAsync"); } }
アノテーション @Async("asyncServiceExecutor") を追加します。 asyncServiceExecutor メソッドは、前の ExecutorConfig.java のメソッド名であり、executeAsync メソッドが開始されることを示しています スレッド プールは asyncServiceExecutor メソッドによって作成されます
次のステップは、次のステップでサービスを注入することです。コントローラーまたはどこかの注釈 @Autowired
@Autowiredprivate AsyncService asyncService; @GetMapping("/async") public void async(){ asyncService.executeAsync(); }
ログ印刷
2022- 07-16 22:15:47.655 INFO 10516 --- [async-service-5] c.u.d.e. executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドによって実行されること
ここでバッチ挿入やその他の時間のかかるタスクを実行できます Things
2022-07-16 22:15:47.655 INFO 10516 - -- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:15:47.770 INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドで実行すること
バッチ挿入などの時間のかかる処理はここで実行できます
2022-07-16 22:15:47.770 INFO 10516 --- [async-service- 1] c.u.d.e.executor.impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:15:47.816 INFO 10516 --- [async-service- 2] c.u.d.e.executor.impl.AsyncServiceImpl : 開始executeAsync
非同期スレッドが行う必要がある
バッチ挿入などの時間のかかる作業はここで実行できます
2022-07-16 22:15:47.816 INFO 10516 - -- [async-service-2] c.u.d.e.executor.impl。 AsyncServiceImpl : endexecuteAsync
2022-07-16 22:15:48.833 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
非同期スレッドによって実行されること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:15:48.834 INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
2022-07-16 22:15:48.986 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
物事を実行する非同期スレッド
次のような時間のかかることを実行できます。ここにバッチ挿入として
2022-07-16 22:15:48.987 INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
次のようにすることができます上記のログから、[async-service-] には複数のスレッドがあり、明らかに構成したスレッド プールで実行され、各リクエストでコントローラーの開始ログと終了ログが連続的に出力され、各リクエストが迅速に応答されたことがわかります。 、時間のかかる操作は非同期実行のためにスレッド プール内のスレッドに任せられます;
スレッド プールを使用しましたが、当時のスレッド プールの状況はまだ不明です。多くのスレッドが実行されており、キュー内で待機しているスレッドはいくつありますか?ここでは、ThreadPoolTaskExecutor のサブクラスを作成しました。これは、スレッドが送信されるたびに、現在のスレッド プールの実行ステータスを出力します。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor; /** * @Author: 腾腾 * @Date: 2022/7/16/0016 22:19 */ public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
上で示したように、showThreadPoolInfo メソッドは、タスクの合計数を出力します。完了したタスクの数、スレッドの数とキューのサイズが出力され、親クラスの実行メソッド、送信メソッド、およびその他のメソッドがオーバーライドされ、内部で showThreadPoolInfo メソッドが呼び出され、タスクが送信されるたびに showThreadPoolInfo メソッドが呼び出されます。スレッド プール、現在のスレッド プールの基本的な状況がログに出力されます。
ExecutorConfig.java の asyncServiceExecutor メソッドを変更し、ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() を ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()に変更します
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); //在这里修改 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; }
プロジェクトのテストを再度開始します
2022-07-16 22:23:30.951 INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTaskExecutor: async-service-, 2. do submit,taskCount [0], completedTaskCount [ 0]、activeCount [0]、queueSize [0]
2022-07-16 22:23:30.952 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl : 開始executeAsync
非同期スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:23:30.953 INFO 14088 --- [async-service-1] c.u.d.e.executor.impl .AsyncServiceImpl : endexecuteAsync
2022-07-16 22:23:31.351 INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [1], completedTaskCount [1]、activeCount [0]、queueSize [0]
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
asynchronous スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクをここで実行できます
2022-07-16 22:23:31.353 INFO 14088 --- [async-service-2] c.u.d.e.executor .impl.AsyncServiceImpl : 終了executeAsync
2022-07-16 22:23:31.927 INFO 14088 --- [nio-8087-exec-5] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [2 ]、 completedTaskCount [ 2]、 activeCount [0]、 queueSize [0]
2022-07-16 22:23:31.929 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl :executeAsync を開始します
非同期 スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクをここで実行できます
2022-07-16 22:23:31.930 INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3]、completedTaskCount [3]、activeCount [0]、queueSize [0]
2022-07-16 22:23:32.498 INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl : startexecuteAsync
asynchronous スレッドが実行する必要があること
バッチ挿入などの時間のかかるタスクはここで実行できます
2022-07-16 22:23:32.499 INFO 14088 --- [async-service- 4] c.u.d.e.executor.impl.AsyncServiceImpl : endexecuteAsync
ログの次の行に注目してください:
2022-07-16 22:23:32.496 INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor: async-service-、2. do submit,taskCount [3]、completedTaskCount [3]、activeCount [0]、queueSize [0]
これは、スレッド プールにタスクを送信するときに、 submit(Callable task) メソッドが呼び出されることを示しています。現在、3 つのタスクが送信され、3 つは完了しています。現在、タスクを処理しているスレッドは 0 つ、キュー内で待機しているタスクは 0 つあります。 . スレッド プールの基本的な状況は明らかです。
以上がSpring Boot がスレッド プールを使用して数万のデータ挿入関数を処理する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。