Home  >  Article  >  Java  >  How to use Java multi-threading to achieve third-party data synchronization

How to use Java multi-threading to achieve third-party data synchronization

WBOY
WBOYforward
2023-04-29 12:25:061546browse

1. Scenario

A recent development task is to synchronize third-party data, and third-party data generally includes stock data and incremental data, and the stock data is 1 million. When learning about this requirement, we conducted a certain amount of information retrieval and tool learning, obtained the stock data in advance to the target database, and then used kettle to convert the stock data; incremental data was obtained through scheduled tasks according to the request time specified by the business party. Increment data and perform data transformations. When obtaining and converting data, we should record each request information to facilitate traceability and data reconciliation!!!

2. Method of obtaining data

2.1 Recursive method

When using the recursive method, a small amount of data is required, otherwise stack overflow or heap overflow will occur!!! And the recursive method is single-threaded, so the synchronization speed will be very slow!!!

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 Multi-threaded method

Since the recursive method is single-threaded, considering the huge data and easy to cause memory overflow, replacing the recursive method with multi-threaded method not only avoids The memory overflow situation is eliminated, and the speed is greatly improved!!!

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

3. How to connect incremental data

Incremental data needs to write scheduled tasks, you can use Scheduled annotation, and need to The incremental data is stored in the target database and data conversion is performed!

The above is the detailed content of How to use Java multi-threading to achieve third-party data synchronization. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete