Eine aktuelle Entwicklungsaufgabe besteht darin, Daten von Drittanbietern zu synchronisieren. Zu den Daten von Drittanbietern gehören im Allgemeinen Bestandsdaten und inkrementelle Daten, und die Bestandsdaten betragen mehr als 1 Million. Als wir von dieser Anforderung erfuhren, führten wir eine gewisse Menge an Informationsabrufen und Werkzeuglernen durch, bezogen die Bestandsdaten vorab in die Zieldatenbank und konvertierten die Bestandsdaten dann mithilfe von Kettle entsprechend der Anforderung durch geplante Aufgaben Zeit, die von der Geschäftspartei angegeben wird. Inkrementieren Sie Daten und führen Sie Datentransformationen durch. Beim Abrufen und Konvertieren von Daten sollten wir jede Anforderungsinformation aufzeichnen, um die Rückverfolgbarkeit und den Datenabgleich zu erleichtern!!! klein, sonst kommt es zu einem Stapelüberlauf oder Heap-Überlauf!!! Und die rekursive Methode ist Single-Threaded, daher ist die Synchronisationsgeschwindigkeit sehr langsam!!!
/** * 数据同步 - 递归方式 * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。 * Data为自定义实体类,这里仅做示例!!! */ private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex,pageSize); if (CollectionUtils.isNotEmpty(datas)) { dataService.saveOrUpdateBatch(datas); log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } // 递归操作-直到数据同步完毕 fetchAndSaveDB(pageIndex + 1, pageSize); } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } } /** * 获取分页数据,Data为自定义实体类,这里仅做示例!!! */ private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception { //通过feign调用第三方接口获取数据 String data = dataFeignService.fetchAllData(pageSize, pageIndex); JSONObject jsonObject = JSONObject.parseObject(data); JSONArray datalist = jsonObject.getJSONArray("datalist"); List<Data> datas = datalist.toJavaList(Data.class); return datas; }2.2 Multithread-Methode
Da die rekursive Methode ist Bedenken Sie, dass die Daten sehr groß sind und leicht zu einem Speicherüberlauf führen können. Die rekursive Methode wird durch Multithreading ersetzt, was nicht nur einen Speicherüberlauf vermeidet, sondern auch die Geschwindigkeit erheblich verbessert!!!
public void synAllData() { // 定义原子变量 - 页数 AtomicInteger pageIndex = new AtomicInteger(0); // 创建线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); // 100万数据 int total = 1000000;//数据总量 int times = total / 1000; if (total % 1000!= 0) { times = times + 1; } LocalDateTime beginLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); for (int index = 1; index <= times; index++) { fixedThreadPool.submit(new Runnable() { @Override public void run() { try { multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000); } catch (Exception e) { log.error("并发获取并保存数据异常:{}", e); } } }); } LocalDateTime endLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟", endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes()); } /** * 数据同步 - 【多线程方式】 * * @throws Exception */ private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1 if (CollectionUtils.isNotEmpty(datas)) { log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } }
3. Anleitung Inkrementelle Daten verbinden
Inkrementieren Die Daten müssen als geplante Aufgabe geschrieben werden und die Annotation „Geplant“ kann verwendet werden. Die inkrementellen Daten müssen in der Zielbibliothek gespeichert und konvertiert werden!
Das obige ist der detaillierte Inhalt vonSo verwenden Sie Java-Multithreading, um die Datensynchronisierung von Drittanbietern zu erreichen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!