最近の開発タスクはサードパーティ データを同期することであり、サードパーティ データには一般にストック データと増分データが含まれており、ストック データは 100 万件です。この要件を学習する際には、ある程度の情報検索とツールの学習を実施し、対象データベースに事前にストックデータを取得し、ケトルを使用してストックデータを変換し、要求に応じてスケジュールされたタスクを通じて増分データを取得しました取引先が指定した時刻にデータをインクリメントし、データ変換を実行します。データを取得および変換するときは、追跡可能性とデータの照合を容易にするために、各リクエスト情報を記録する必要があります!!!
2.1 再帰的方法
再帰的メソッドを使用する場合、少量のデータが必要です。そうしないと、スタック オーバーフローまたはヒープ オーバーフローが発生します!!! また、再帰的メソッドはシングルスレッドであるため、同期速度が非常に遅くなります!!!
/** * 数据同步 - 递归方式 * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。 * Data为自定义实体类,这里仅做示例!!! */ private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex,pageSize); if (CollectionUtils.isNotEmpty(datas)) { dataService.saveOrUpdateBatch(datas); log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } // 递归操作-直到数据同步完毕 fetchAndSaveDB(pageIndex + 1, pageSize); } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } } /** * 获取分页数据,Data为自定义实体类,这里仅做示例!!! */ private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception { //通过feign调用第三方接口获取数据 String data = dataFeignService.fetchAllData(pageSize, pageIndex); JSONObject jsonObject = JSONObject.parseObject(data); JSONArray datalist = jsonObject.getJSONArray("datalist"); List<Data> datas = datalist.toJavaList(Data.class); return datas; }
2.2 マルチスレッド方式
再帰的方式はシングルスレッドであるため、データが膨大でメモリオーバーフローを起こしやすいことを考慮し、再帰的方式をマルチスレッド方式に置き換えます。スレッド方式では、メモリ オーバーフローの状況が解消されるだけでなく、速度が大幅に向上します!!!
public void synAllData() { // 定义原子变量 - 页数 AtomicInteger pageIndex = new AtomicInteger(0); // 创建线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); // 100万数据 int total = 1000000;//数据总量 int times = total / 1000; if (total % 1000!= 0) { times = times + 1; } LocalDateTime beginLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); for (int index = 1; index <= times; index++) { fixedThreadPool.submit(new Runnable() { @Override public void run() { try { multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000); } catch (Exception e) { log.error("并发获取并保存数据异常:{}", e); } } }); } LocalDateTime endLocalDateTime = LocalDateTime.now(); log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟", endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes()); } /** * 数据同步 - 【多线程方式】 * * @throws Exception */ private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception { log.info("【数据同步 - 存量】,第{}次同步,", pageIndex); List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1 if (CollectionUtils.isNotEmpty(datas)) { log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex); if (datas.size() < pageSize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex); return; } } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex); return; } }
増分データには、スケジュールされたタスクを記述する必要があります。スケジュールされたアノテーションを使用でき、必要があります。増分データはターゲット データベースに保存され、データ変換が実行されます!
以上がJava マルチスレッドを使用してサードパーティ データの同期を実現する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。