ホームページ  >  記事  >  Java  >  Javaマルチスレッドバッチデータインポート方法の紹介

Javaマルチスレッドバッチデータインポート方法の紹介

不言
不言転載
2019-04-04 09:45:404233ブラウズ

この記事では、Java マルチスレッドでバッチ データをインポートする方法を紹介します。一定の参考価値があります。必要な友人は参考にしてください。お役に立てれば幸いです。

前書き: 大量のデータのインポートが発生した場合、処理速度を向上させるために、マルチスレッドを使用してこれらのプロセスをバッチ処理することを選択できます。一般的なシナリオは次のとおりです。

  1. 大きなファイルをデータベースにインポートする (このファイルは必ずしも標準の CSV インポート可能ファイルであるとは限らず、メモリ内で特定の処理が必要です)
  2. Data同期 (サードパーティのインターフェイスからデータを取得して処理し、独自のデータベースに書き込む)

上記のシナリオには 1 つの共通点があります。このタイプのデータをインポートするシナリオは、次のとおりです。 to データはあるデータ ソースから別のデータ ソースに移動されます。これは 2 つのステップに分割する必要があります

  1. データ読み取り: データ ソースからデータを読み取ります。メモリへの
  2. データ書き込み : メモリ内のデータを別のデータ ソースに書き込みます。データ処理がある場合があります。

データの読み取り速度は一般に、データの書き込み速度よりもはるかに高速です。つまり、 は読み取りが速く、 は書き込みが遅くなります。

設計思想

シナリオの特徴として、読み込みが速く、書き込みが遅いがあるため、マルチスレッド処理を使用する場合は、データ書き込みを推奨します。 部分的にマルチスレッドに変換されます。また、データ読み取りは、データの一括読み取りに変換できます。簡単に言うと、重要なポイントは 2 つあります。

  1. データのバッチ読み取り
  2. データを書き込むための複数のスレッド

Multiple thread バッチ処理の最も簡単な解決策は、処理にスレッド プールを使用することです。以下では、バッチの読み取りと書き込みをシミュレートするサービスと、このサービスへのマルチスレッド書き込み呼び出しを例として使用して、複数のインポート方法を示します。 -スレッド化されたバッチデータ。

シミュレーションサービス

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

バッチデータプロセッサ

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

テストクラス

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

実行結果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

シングルスレッドの場合の実行時間は total*sleepTime、つまり 10000000ms になるはずですが、マルチスレッドへの変換後の実行時間は 648447ms になります。

【関連する推奨事項: Java ビデオ チュートリアル ]

以上がJavaマルチスレッドバッチデータインポート方法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はsegmentfault.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。