Heim  >  Artikel  >  Java  >  Einführung in die Java-Multithread-Batch-Datenimportmethode

Einführung in die Java-Multithread-Batch-Datenimportmethode

不言
不言nach vorne
2019-04-04 09:45:404227Durchsuche

Dieser Artikel bietet Ihnen eine Einführung in die Methode des Java-Multithread-Batch-Datenimports. Ich hoffe, dass er für Freunde hilfreich ist.

Vorwort: Wenn Sie auf eine große Menge an Datenimporten stoßen, können Sie zur Erhöhung der Verarbeitungsgeschwindigkeit Multithreading verwenden, um diese Prozesse stapelweise zu verarbeiten. Zu den häufigsten Szenarien gehören:

  1. Importieren großer Dateien in die Datenbank (diese Datei ist nicht unbedingt eine standardmäßig CSV importierbare Datei oder erfordert eine bestimmte Verarbeitung im Speicher)
  2. Datensynchronisierung (Ziehen Sie die

Die oben genannten Szenarien haben eines gemeinsam: Das Szenario dieser Art des Datenimports besteht einfach darin, die Daten zu konvertieren Daten aus einer Datenquelle werden in eine andere Datenquelle verschoben , was in zwei Schritte unterteilt werden muss

  1. Datenlesen : Daten aus der Datenquelle lesen in den Speicher
  2. Daten schreiben: Schreiben Sie die Daten im Speicher in eine andere Datenquelle. Möglicherweise erfolgt eine Datenverarbeitung

und die Geschwindigkeit des Datenlesens ist im Allgemeinen schneller als die Geschwindigkeit beim Schreiben von Daten, d. h. ist schnell beim Lesen und langsam beim Schreiben .

Designideen

Da die Merkmale der Szene schnelles Lesen und langsames Schreiben sind, wird bei Verwendung von Multithreading empfohlen, Daten zu schreibenTeilweise auf Multithreading umgestellt. Und das Datenlesen kann in ein Stapellesen von Daten umgewandelt werden. Vereinfacht ausgedrückt gibt es zwei Hauptpunkte:

  1. Batch-Lesen von Daten
  2. Mehrere Threads zum Schreiben von Daten

Beispiel

Mehrere Threads Die einfachste Lösung für die Stapelverarbeitung besteht darin, einen Thread-Pool für die Verarbeitung zu verwenden. Im Folgenden wird ein Dienst zum Simulieren von Stapellesen und -schreiben sowie ein Multithread-Schreibaufruf für diesen Dienst als Beispiel verwendet, um zu zeigen, wie mehrere Threads importiert werden -Threaded-Batch-Daten.

Simulationsdienst

import java.util.concurrent.atomic.AtomicLong;

/**
 * 数据批量写入用的模拟服务
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可读取总数
     */
    private long canReadTotal;

    /**
     * 写入总数
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 写入休眠时间(单位:毫秒)
     */
    private final long sleepTime;

    /**
     * 构造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量读取数据接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 写入数据接口
     */
    public void writeData() {
        try {
            // 休眠一定时间模拟写入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 写入总数自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 获取写入的总数
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

Batch-Datenprozessor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于线程池的多线程批量写入处理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量读取的数据量
     */
    private int batch;
    /**
     * 线程个数
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定数目的线程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 开始处理
     */
    public void startHandle() {
        // 开始处理的时间
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 关闭线程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待线程池中的线程执行完

        }
        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 总耗时
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 写入总数
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

Testklasse

/**
 * SimpleBatchHandler的测试类
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 总数
        long total=100000;
        // 休眠时间
        long sleepTime=100;
        // 每次拉取的数量
        int batch=100;
        // 线程个数
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

Laufende Ergebnisse

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

Analyse

Die Ausführungszeit in einer Single-Threaded-Situation sollte total*sleepTime sein, also 10000000ms, und die Ausführungszeit nach der Umwandlung in Multithreading beträgt 648447ms.

[Verwandte Empfehlungen: Java-Video-Tutorial]

Das obige ist der detaillierte Inhalt vonEinführung in die Java-Multithread-Batch-Datenimportmethode. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:segmentfault.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen