Dieser Artikel bietet Ihnen eine Einführung in die Methode des Java-Multithread-Batch-Datenimports. Ich hoffe, dass er für Freunde hilfreich ist.
Vorwort: Wenn Sie auf eine große Menge an Datenimporten stoßen, können Sie zur Erhöhung der Verarbeitungsgeschwindigkeit Multithreading verwenden, um diese Prozesse stapelweise zu verarbeiten. Zu den häufigsten Szenarien gehören:
CSV
importierbare Datei oder erfordert eine bestimmte Verarbeitung im Speicher) Die oben genannten Szenarien haben eines gemeinsam: Das Szenario dieser Art des Datenimports besteht einfach darin, die Daten zu konvertieren Daten aus einer Datenquelle werden in eine andere Datenquelle verschoben , was in zwei Schritte unterteilt werden muss
und die Geschwindigkeit des Datenlesens ist im Allgemeinen schneller als die Geschwindigkeit beim Schreiben von Daten, d. h. ist schnell beim Lesen und langsam beim Schreiben .
Da die Merkmale der Szene schnelles Lesen und langsames Schreiben sind, wird bei Verwendung von Multithreading empfohlen, Daten zu schreibenTeilweise auf Multithreading umgestellt. Und das Datenlesen kann in ein Stapellesen von Daten umgewandelt werden. Vereinfacht ausgedrückt gibt es zwei Hauptpunkte:
Mehrere Threads Die einfachste Lösung für die Stapelverarbeitung besteht darin, einen Thread-Pool für die Verarbeitung zu verwenden. Im Folgenden wird ein Dienst zum Simulieren von Stapellesen und -schreiben sowie ein Multithread-Schreibaufruf für diesen Dienst als Beispiel verwendet, um zu zeigen, wie mehrere Threads importiert werden -Threaded-Batch-Daten.
import java.util.concurrent.atomic.AtomicLong; /** * 数据批量写入用的模拟服务 * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可读取总数 */ private long canReadTotal; /** * 写入总数 */ private AtomicLong writeTotal=new AtomicLong(0); /** * 写入休眠时间(单位:毫秒) */ private final long sleepTime; /** * 构造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量读取数据接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 写入数据接口 */ public void writeData() { try { // 休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 写入总数自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 获取写入的总数 * * @return */ public long getWriteTotal() { return writeTotal.get(); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于线程池的多线程批量写入处理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量读取的数据量 */ private int batch; /** * 线程个数 */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定数目的线程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 开始处理 */ public void startHandle() { // 开始处理的时间 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 关闭线程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待线程池中的线程执行完 } // 结束时间 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 总耗时 System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 写入总数 System.out.println("total write num:" + service.getWriteTotal()); } }
/** * SimpleBatchHandler的测试类 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandlerTest { public static void main(String[] args) { // 总数 long total=100000; // 休眠时间 long sleepTime=100; // 每次拉取的数量 int batch=100; // 线程个数 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
Die Ausführungszeit in einer Single-Threaded-Situation sollte total*sleepTime
sein, also 10000000ms
, und die Ausführungszeit nach der Umwandlung in Multithreading beträgt 648447ms
.
[Verwandte Empfehlungen: Java-Video-Tutorial]
Das obige ist der detaillierte Inhalt vonEinführung in die Java-Multithread-Batch-Datenimportmethode. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!