>  기사  >  Java  >  Java 다중 스레드 일괄 데이터 가져오기 방법 소개

Java 다중 스레드 일괄 데이터 가져오기 방법 소개

不言
不言앞으로
2019-04-04 09:45:404220검색

이 기사에서는 Java 다중 스레드 일괄 데이터 가져오기 방법을 소개합니다. 이는 특정 참고 가치가 있으므로 도움이 필요한 친구에게 도움이 되기를 바랍니다.

서문: 대량의 데이터 가져오기가 발생하는 경우 처리 속도를 높이기 위해 멀티스레딩을 사용하여 이러한 프로세스를 일괄 처리하도록 선택할 수 있습니다. 일반적인 시나리오는 다음과 같습니다.

  1. 대용량 파일을 데이터베이스로 가져오기(이 파일은 반드시 가져올 수 있는 표준 CSV 파일이 아니거나 메모리에서 특정 처리가 필요하지 않음)
  2. CSV可导入文件或者需要在内存中经过一定的处理)
  3. 数据同步(从第三方接口拉取数据处理后写入自己的数据库)

以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步

  1. 数据读取:从数据源读取数据到内存
  2. 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理

而且数据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢

设计思路

由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:

  1. 批量读取数据
  2. 多线程写入数据

示例

多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。

模拟服务

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

批量数据处理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

测试类

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

运行结果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

在单线程情况下的执行时间应该为total*sleepTime,即10000000ms,而改造为多线程后执行时间为648447ms데이터 동기화(세 번째- 파티 인터페이스는 처리할 데이터를 가져온 다음 자체 데이터베이스에 기록합니다.

위 시나리오에는 한 가지 공통점이 있습니다. 이러한 유형의 데이터 가져오기 시나리오는 단순히 데이터 소스에서 데이터를 이동하는 것입니다. . 다른 데이터 소스로 이는 두 단계로 나누어져야 합니다

데이터 읽기: 데이터 소스에서 메모리로 데이터 읽기

데이터 쓰기: 메모리의 데이터를 다른 데이터 소스에 씁니다. 데이터 처리가 있을 수 있습니다.

그리고 데이터 읽기 속도는 일반적으로 데이터 쓰기 속도보다 훨씬 빠릅니다. >읽기는 빠르고 쓰기는 느립니다

.

디자인 아이디어

🎜장면의 특성은 빠른 읽기, 느린 쓰기이므로 멀티스레딩을 사용하는 경우 데이터 쓰기를 권장합니다. > 멀티스레딩으로 부분적으로 수정되었습니다. 그리고 데이터 읽기는 데이터 일괄 읽기로 변환될 수 있습니다. 간단히 말해서 두 가지 핵심 사항이 있습니다. 🎜🎜🎜일괄 데이터 읽기🎜다중 스레드로 데이터 쓰기

🎜다중을 위한 가장 간단한 솔루션 -스레드 일괄 처리 처리에는 스레드 풀이 사용됩니다. 다음에서는 일괄 읽기 및 쓰기를 시뮬레이션하는 서비스와 이 서비스에 대한 멀티 스레드 쓰기 호출을 예로 들어 멀티 스레드 일괄 데이터를 가져오는 방법을 보여줍니다. 🎜

시뮬레이션 서비스

rrreee

배치 데이터 프로세서

rrreee

테스트 클래스

rrreee

실행 결과

rrreee

분석

🎜싱글 스레드의 경우 실행 시간은 total*sleepTime, 즉 10000000ms여야 하며, 멀티 스레드로 변환한 후의 실행 시간은 648447ms입니다. . 🎜🎜【관련 추천: 🎜Java 비디오 튜토리얼🎜】🎜🎜🎜

위 내용은 Java 다중 스레드 일괄 데이터 가져오기 방법 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제