This article brings you an introduction to the method of Java multi-threaded batch data import. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
Foreword: When encountering a large amount of data import, in order to increase the processing speed, you can choose to use multi-threading to batch process these processes. Common scenarios include:
- Importing large files into the database (this file is not necessarily a standard
CSV
importable file or requires certain processing in memory) - Data synchronization (pulling data from a third-party interface and processing it and then writing it into your own database)
The above scenarios have one thing in common. The scenario of importing this type of data is simply to Data is moved from one data source to another data source, which must be divided into two steps
- Data reading: read data from the data source to the memory
- Data writing: Write the data in the memory to another data source. There may be data processing
And the speed of data reading is generally It is much faster than the data writing speed, that is, is fast to read and is slow to write.
Design ideas
Since the characteristics of the scenario are fast reading and slow writing, if multi-thread processing is used, it is recommended to data writingPartially transformed to multi-threading. And Data reading can be transformed into batch reading of data. To put it simply, there are two key points:
- Batch reading of data
- Multiple threads to write data
Example
Multiple threads The simplest solution for batch processing is to use a thread pool for processing. The following will use a service that simulates batch reading and writing, and a multi-threaded write call to this service as an example to show how to import multi-threaded batch data.
Simulation Service
import java.util.concurrent.atomic.AtomicLong; /** * 数据批量写入用的模拟服务 * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可读取总数 */ private long canReadTotal; /** * 写入总数 */ private AtomicLong writeTotal=new AtomicLong(0); /** * 写入休眠时间(单位:毫秒) */ private final long sleepTime; /** * 构造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量读取数据接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 写入数据接口 */ public void writeData() { try { // 休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 写入总数自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 获取写入的总数 * * @return */ public long getWriteTotal() { return writeTotal.get(); } }
Batch Data Processor
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于线程池的多线程批量写入处理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量读取的数据量 */ private int batch; /** * 线程个数 */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定数目的线程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 开始处理 */ public void startHandle() { // 开始处理的时间 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止 for (long i = 0; i service.writeData()); } } // 关闭线程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待线程池中的线程执行完 } // 结束时间 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 总耗时 System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 写入总数 System.out.println("total write num:" + service.getWriteTotal()); } }
Test Class
/** * SimpleBatchHandler的测试类 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandlerTest { public static void main(String[] args) { // 总数 long total=100000; // 休眠时间 long sleepTime=100; // 每次拉取的数量 int batch=100; // 线程个数 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
Running Results
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
Analysis
The execution time in single-threaded case should be total*sleepTime
, that is, 10000000ms
, while the execution time after transformation to multi-threading is 648447ms
.
【Related recommendations: Java Video Tutorial】
The above is the detailed content of Introduction to Java multi-threaded batch data import method. For more information, please follow other related articles on the PHP Chinese website!

大家好,我是啃书君!正所谓:有朋自远方来,不亦乐乎?有朋友来找我们玩,是一件很快乐的事情,那么我们要尽地主之谊,好好带朋友去玩耍!那么问题来了,什么时候去哪里玩最好呢,哪里玩的地方最多呢?今天将手把手教你使用线程池爬取同程旅行的景点信息及评论数据并做词云、数据可视化!!!带你了解各个城市的游玩景点信息。在开始爬取数据之前,我们首先来了解一下线程。线程进程:进程是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。线程:是轻量级的进程,是程序执行的最小单元,是进程的一个执行路径。一

如何在Java7中使用线程池来实现任务的循环调度引言:在开发Java应用程序时,使用线程池可以提高任务的执行效率和资源利用率。在Java7中,使用线程池可以很方便地实现任务的循环调度。本文将介绍如何在Java7中使用线程池来实现任务的循环调度,并附上相应的代码示例。一、概述:线程池是一种多线程处理结构,它可以重复使用固定数量的线程,从而避免频繁地创建和

Linux是一款优秀的操作系统,广泛应用于服务器系统中。在使用Linux系统的过程中,服务器负载问题是一种常见的现象。服务器负载是指服务器的系统资源无法满足当前的请求,导致系统负载过高,从而影响服务器性能。本文将介绍Linux系统下常见的服务器负载问题及其解决方法。一、CPU负载过高当服务器的CPU负载过高时,会导致系统响应变慢、请求处理时间变长等问题。当C

如何在Java7中使用线程池来实现任务的优先级调度在并发编程中,任务的优先级调度是一个常见的需求。Java提供了线程池的机制,使得我们可以方便地管理和调度任务。本文将介绍如何在Java7中使用线程池来实现任务的优先级调度。首先,我们需要了解Java7中线程池的基本概念和用法。线程池是一种重用线程的机制,它可以管理和调度一组线程来执行多个任务。Java提

随着微服务架构在企业级应用中的广泛应用,对于如何优化微服务的性能和稳定性也成为了人们关注的焦点。在微服务中,一个微服务可能会处理数千个请求,而服务的线程池和任务调度也是微服务性能和稳定性的重要组成部分。本文将介绍微服务架构中的线程池和任务调度,以及如何在微服务中优化线程池和任务调度的性能。一、微服务架构中的线程池在微服务架构中,每个微服务处理的请求都会占用其

配置spring线程池的方法:1、使用ThreadPoolTaskExecutor Bean;2、使用SimpleAsyncTaskExecutor;3、在XML中使用TaskExecutor Bean;4、使用第三方库;5、自定义实现;6、通过系统属性或环境变量配置;7、集成与容器;8、编程式配置;9、使用第三方框架集成;10、混合配置;11、考虑资源限制和约束等等。

随着互联网时代的到来,网站和应用程序越来越受到人们的欢迎。在Web开发中,PHP是一个非常流行的脚本语言。PHP是一种解释性语言,它可以在服务器上执行。由于PHP语言易学易用,因此它成为了PHP开发人员的首选之一。但是,当涉及到高负载应用程序或在服务器上处理大量数据时,PHP是不太适合的。因此,我们需要使用线程池来解决这个问题。线程池是什么?线程池是一

标题:利用线程池提升Tomcat的性能摘要:随着互联网的高速发展,Web应用程序的性能成为了至关重要的因素。而Tomcat作为一款广泛使用的服务器容器,如何提升其性能成为许多开发人员关注的话题。本文将介绍如何利用线程池来提升Tomcat的性能,并给出了具体的代码示例。正文:一、线程池介绍线程池是一种常用的多线程处理方式,它能够优化线程的创建和销毁过程,提高系


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.

Dreamweaver Mac version
Visual web development tools

ZendStudio 13.5.1 Mac
Powerful PHP integrated development environment

Atom editor mac version download
The most popular open source editor

SublimeText3 Linux new version
SublimeText3 Linux latest version
