ホームページ  >  記事  >  Java  >  Java マルチスレッドを使用してサードパーティ データの同期を実現する方法

Java マルチスレッドを使用してサードパーティ データの同期を実現する方法

WBOY
WBOY転載
2023-04-29 12:25:061551ブラウズ

1. シナリオ

最近の開発タスクはサードパーティ データを同期することであり、サードパーティ データには一般にストック データと増分データが含まれており、ストック データは 100 万件です。この要件を学習する際には、ある程度の情報検索とツールの学習を実施し、対象データベースに事前にストックデータを取得し、ケトルを使用してストックデータを変換し、要求に応じてスケジュールされたタスクを通じて増分データを取得しました取引先が指定した時刻にデータをインクリメントし、データ変換を実行します。データを取得および変換するときは、追跡可能性とデータの照合を容易にするために、各リクエスト情報を記録する必要があります!!!

2. データの取得方法

2.1 再帰的方法

再帰的メソッドを使用する場合、少量のデータが必要です。そうしないと、スタック オーバーフローまたはヒープ オーバーフローが発生します!!! また、再帰的メソッドはシングルスレッドであるため、同期速度が非常に遅くなります!!!

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 マルチスレッド方式

再帰的方式はシングルスレッドであるため、データが膨大でメモリオーバーフローを起こしやすいことを考慮し、再帰的方式をマルチスレッド方式に置き換えます。スレッド方式では、メモリ オーバーフローの状況が解消されるだけでなく、速度が大幅に向上します!!!

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

3. 増分データの接続方法

増分データには、スケジュールされたタスクを記述する必要があります。スケジュールされたアノテーションを使用でき、必要があります。増分データはターゲット データベースに保存され、データ変換が実行されます!

以上がJava マルチスレッドを使用してサードパーティ データの同期を実現する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。