>  기사  >  Java  >  Java 멀티스레딩을 사용하여 타사 데이터 동기화를 달성하는 방법

Java 멀티스레딩을 사용하여 타사 데이터 동기화를 달성하는 방법

WBOY
WBOY앞으로
2023-04-29 12:25:061487검색

1. 시나리오

최근 개발 과제는 타사 데이터를 동기화하는 것인데, 타사 데이터에는 일반적으로 주식 데이터와 증분 데이터가 포함되며 주식 데이터는 100만 개 이상입니다. 이 요구 사항에 대해 학습할 때 우리는 일정량의 정보 검색 및 도구 학습을 수행하고 사전에 대상 데이터베이스에 대한 주식 데이터를 얻은 다음 Kettle을 사용하여 예정된 작업을 통해 얻은 주식 데이터를 변환했습니다. 비즈니스 당사자가 지정한 시간을 요청하여 데이터를 증가시키고 데이터 변환을 수행합니다. 데이터 획득 및 변환 시 추적성과 데이터 조정을 용이하게 하기 위해 각 요청 정보를 기록해야 합니다!!!

2. 데이터 획득 방법

2.1 재귀적 방법

재귀적 방법을 사용할 경우 데이터가 필요합니다. 작으면 스택 오버플로나 힙 오버플로가 발생합니다!!! 그리고 재귀 방식은 싱글 스레드이므로 동기화 속도가 매우 느립니다!!!

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 멀티 스레드 방식

단일 스레드, 고려 데이터가 방대하고 쉽게 메모리 오버플로를 일으킬 수 있기 때문에 재귀적 방법을 멀티 스레드로 대체합니다. 이는 메모리 오버플로를 피할 뿐만 아니라 속도도 크게 향상시킵니다

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

3. 방법 증분 데이터 연결

증분 데이터는 예약된 작업으로 작성되어야 하며 예약된 주석을 사용할 수 있습니다. 증분 데이터는 대상 데이터베이스에 저장되고 데이터가 변환되어야 합니다!

위 내용은 Java 멀티스레딩을 사용하여 타사 데이터 동기화를 달성하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제