搜索
首页Javajava教程Java多线程批量数据导入的方法介绍

Java多线程批量数据导入的方法介绍

Apr 04, 2019 am 09:45 AM
java线程池

本篇文章给大家带来的内容是关于Java多线程批量数据导入的方法介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

前言:当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:

  1. 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
  2. 数据同步(从第三方接口拉取数据处理后写入自己的数据库)

以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步

  1. 数据读取:从数据源读取数据到内存
  2. 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理

而且数据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢

设计思路

由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:

  1. 批量读取数据
  2. 多线程写入数据

示例

多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。

模拟服务

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

批量数据处理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

测试类

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

运行结果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

在单线程情况下的执行时间应该为total*sleepTime,即10000000ms,而改造为多线程后执行时间为648447ms

【相关推荐:Java视频教程

以上是Java多线程批量数据导入的方法介绍的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:segmentfault。如有侵权,请联系admin@php.cn删除
如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案?如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案?Mar 17, 2025 pm 05:46 PM

本文讨论了使用Maven和Gradle进行Java项目管理,构建自动化和依赖性解决方案,以比较其方法和优化策略。

如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)?如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)?Mar 17, 2025 pm 05:45 PM

本文使用Maven和Gradle之类的工具讨论了具有适当的版本控制和依赖关系管理的自定义Java库(JAR文件)的创建和使用。

如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存?如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存?Mar 17, 2025 pm 05:44 PM

本文讨论了使用咖啡因和Guava缓存在Java中实施多层缓存以提高应用程序性能。它涵盖设置,集成和绩效优势,以及配置和驱逐政策管理最佳PRA

如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射?如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射?Mar 17, 2025 pm 05:43 PM

本文讨论了使用JPA进行对象相关映射,并具有高级功能,例如缓存和懒惰加载。它涵盖了设置,实体映射和优化性能的最佳实践,同时突出潜在的陷阱。[159个字符]

Java的类负载机制如何起作用,包括不同的类载荷及其委托模型?Java的类负载机制如何起作用,包括不同的类载荷及其委托模型?Mar 17, 2025 pm 05:35 PM

Java的类上载涉及使用带有引导,扩展程序和应用程序类负载器的分层系统加载,链接和初始化类。父代授权模型确保首先加载核心类别,从而影响自定义类LOA

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

安全考试浏览器

安全考试浏览器

Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具