Maison  >  Article  >  Java  >  Introduction à la méthode d'importation de données par lots multithread Java

Introduction à la méthode d'importation de données par lots multithread Java

不言
不言avant
2019-04-04 09:45:404220parcourir

Cet article vous présente la méthode d'importation de données par lots multithread Java. Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.

Avant-propos : Lorsque vous rencontrez une grande quantité d'importation de données, afin d'augmenter la vitesse de traitement, vous pouvez choisir d'utiliser le multithread pour traiter par lots ces processus. Les scénarios courants incluent :

  1. Importation de fichiers volumineux dans la base de données (ce fichier n'est pas nécessairement un fichier CSV importable standard ou nécessite certains traitements en mémoire)
  2. Synchronisation des données (extraire le données de l'interface tierce et traitez-les puis écrivez-les dans votre propre base de données)

Les scénarios ci-dessus ont une chose en commun Le scénario de ce type d'importation de données consiste simplement à convertir les données. données d'une donnée La source est déplacée vers une autre source de données , qui doit être divisée en deux étapes

  1. Lecture des données  : lire les données de la source de données en mémoire
  2. Écriture des données : Écrivez les données dans la mémoire dans une autre source de données. Il peut y avoir un traitement des données

et la vitesse de lecture des données est généralement. plus rapide que celle de l'écriture des données. La vitesse est beaucoup plus rapide, c'est-à-dire est rapide pour la lecture et lente pour l'écriture .

Idées de conception

Étant donné que les caractéristiques de la scène sont lecture rapide et écriture lente, si le multithreading est utilisé, il est recommandé de écriture de donnéesPartiellement modifié en multi-thread. Et la lecture de données peut être transformée en lecture par lots de données. Pour faire simple, il y a deux points principaux :

  1. Lire les données par lots
  2. Écrire les données avec plusieurs threads

Exemple

Multi-thread La solution la plus simple pour le traitement par lots consiste à utiliser un pool de threads pour le traitement. Ce qui suit utilisera un service qui simule la lecture et l'écriture par lots, et un appel d'écriture multi-thread à ce service comme exemple pour montrer comment importer. données par lots multithread.

Service de simulation

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

Processeur de données par lots

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

Classe de test

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

Résultats en cours d'exécution

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

Analyse Le temps d'exécution de

en monothread doit être total*sleepTime, c'est-à-dire 10000000ms, et le temps d'exécution après transformation en multi-threading est 648447ms.

[Recommandations associées : Tutoriel vidéo Java]

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer