ホームページ >Java >&#&チュートリアル >Javaマルチスレッドバッチデータインポート方法の紹介
この記事では、Java マルチスレッドでバッチ データをインポートする方法を紹介します。一定の参考価値があります。必要な友人は参考にしてください。お役に立てれば幸いです。
前書き: 大量のデータのインポートが発生した場合、処理速度を向上させるために、マルチスレッドを使用してこれらのプロセスをバッチ処理することを選択できます。一般的なシナリオは次のとおりです。
CSV
インポート可能ファイルであるとは限らず、メモリ内で特定の処理が必要です) 上記のシナリオには 1 つの共通点があります。このタイプのデータをインポートするシナリオは、次のとおりです。 to データはあるデータ ソースから別のデータ ソースに移動されます。これは 2 つのステップに分割する必要があります
データの読み取り速度は一般に、データの書き込み速度よりもはるかに高速です。つまり、 は読み取りが速く、 は書き込みが遅くなります。
シナリオの特徴として、読み込みが速く、書き込みが遅いがあるため、マルチスレッド処理を使用する場合は、データ書き込みを推奨します。 部分的にマルチスレッドに変換されます。また、データ読み取りは、データの一括読み取りに変換できます。簡単に言うと、重要なポイントは 2 つあります。
Multiple thread バッチ処理の最も簡単な解決策は、処理にスレッド プールを使用することです。以下では、バッチの読み取りと書き込みをシミュレートするサービスと、このサービスへのマルチスレッド書き込み呼び出しを例として使用して、複数のインポート方法を示します。 -スレッド化されたバッチデータ。
import java.util.concurrent.atomic.AtomicLong; /** * 数据批量写入用的模拟服务 * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可读取总数 */ private long canReadTotal; /** * 写入总数 */ private AtomicLong writeTotal=new AtomicLong(0); /** * 写入休眠时间(单位:毫秒) */ private final long sleepTime; /** * 构造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量读取数据接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 写入数据接口 */ public void writeData() { try { // 休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 写入总数自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 获取写入的总数 * * @return */ public long getWriteTotal() { return writeTotal.get(); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于线程池的多线程批量写入处理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量读取的数据量 */ private int batch; /** * 线程个数 */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定数目的线程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 开始处理 */ public void startHandle() { // 开始处理的时间 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 关闭线程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待线程池中的线程执行完 } // 结束时间 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 总耗时 System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 写入总数 System.out.println("total write num:" + service.getWriteTotal()); } }
/** * SimpleBatchHandler的测试类 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandlerTest { public static void main(String[] args) { // 总数 long total=100000; // 休眠时间 long sleepTime=100; // 每次拉取的数量 int batch=100; // 线程个数 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
シングルスレッドの場合の実行時間は total*sleepTime
、つまり 10000000ms
になるはずですが、マルチスレッドへの変換後の実行時間は 648447ms
になります。
【関連する推奨事項: Java ビデオ チュートリアル ]
以上がJavaマルチスレッドバッチデータインポート方法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。