Home >Java >javaTutorial >Introduction to Java multi-threaded batch data import method

Introduction to Java multi-threaded batch data import method

不言
不言forward
2019-04-04 09:45:404271browse

This article brings you an introduction to the method of Java multi-threaded batch data import. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.

Foreword: When encountering a large amount of data import, in order to increase the processing speed, you can choose to use multi-threading to batch process these processes. Common scenarios include:

  1. Importing large files into the database (this file is not necessarily a standard CSV importable file or requires certain processing in memory)
  2. Data synchronization (pulling data from a third-party interface and processing it and then writing it into your own database)

The above scenarios have one thing in common. The scenario of importing this type of data is simply to Data is moved from one data source to another data source, which must be divided into two steps

  1. Data reading: read data from the data source to the memory
  2. Data writing: Write the data in the memory to another data source. There may be data processing

And the speed of data reading is generally It is much faster than the data writing speed, that is, is fast to read and is slow to write.

Design ideas

Since the characteristics of the scenario are fast reading and slow writing, if multi-thread processing is used, it is recommended to data writingPartially transformed to multi-threading. And Data reading can be transformed into batch reading of data. To put it simply, there are two key points:

  1. Batch reading of data
  2. Multiple threads to write data

Example

Multiple threads The simplest solution for batch processing is to use a thread pool for processing. The following will use a service that simulates batch reading and writing, and a multi-threaded write call to this service as an example to show how to import multi-threaded batch data.

Simulation Service

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

Batch Data Processor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

Test Class

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

Running Results

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

Analysis

The execution time in single-threaded case should be total*sleepTime, that is, 10000000ms, while the execution time after transformation to multi-threading is 648447ms.

【Related recommendations: Java Video Tutorial

The above is the detailed content of Introduction to Java multi-threaded batch data import method. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:segmentfault.com. If there is any infringement, please contact admin@php.cn delete