首頁  >  文章  >  Java  >  Java多執行緒批次資料匯入的方法介紹

Java多執行緒批次資料匯入的方法介紹

不言
不言轉載
2019-04-04 09:45:404146瀏覽

這篇文章帶給大家的內容是關於Java多執行緒批次資料導入的方法介紹,有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

前言:當遇到大量資料匯入時,為了提高處理的速度,可以選擇使用多執行緒來批次處理這些處理。常見的場景有:

  1. 大檔案匯入資料庫(這個檔案不一定是標準的CSV可匯入檔案或需要在記憶體中經過一定的處理)
  2. 資料同步(從第三方介面拉取資料處理後寫入自己的資料庫)

以上的場景有一個共通性,這類資料匯入的場景簡單來說就是將資料從一個資料來源移動到另一個資料來源,而其中必定可以分成兩步驟

  1. 資料讀取:從資料來源讀取資料到內存
  2. 資料寫入:將記憶體中的資料寫入到另外一個資料來源,可能存在資料處理

而且資料讀取的速度一般會比資料寫入的速度快很多,也就是讀取快,寫入慢

設計想法

由於場景的特徵是讀取快,寫入慢,如果是使用多執行緒處理,建議是資料寫入部分改造為多執行緒。而資料讀取可以改造成批次讀取資料。簡單來說就是兩個重點:

  1. 批次讀取資料
  2. 多執行緒寫入資料

範例

##多執行緒批次處理最簡單的方案是使用執行緒池來進行處理,下面會透過一個模擬批次讀取和寫入的服務,以及對這個服務的多執行緒寫入呼叫作為範例,展示如何多執行緒批次資料導入。

模擬服務

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

批次資料處理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

測試類別

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

執行結果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

#在單執行緒情況下的執行時間應該是

total*sleepTime,即10000000ms,而改造為多執行緒後執行時間為648447ms

【相關推薦:

Java影片教學

以上是Java多執行緒批次資料匯入的方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除