Maison  >  Article  >  Java  >  Comment utiliser le multithreading Java pour réaliser la synchronisation des données tierces

Comment utiliser le multithreading Java pour réaliser la synchronisation des données tierces

WBOY
WBOYavant
2023-04-29 12:25:061553parcourir

1. Scénario

Une tâche de développement récente consiste à synchroniser les données tierces, et les données tierces incluent généralement des données boursières et des données incrémentielles, et les données boursières sont de plus d'un million. Après avoir pris connaissance de cette exigence, nous avons effectué une certaine quantité de recherche d'informations et d'apprentissage d'outils, obtenu les données boursières à l'avance dans la base de données cible, puis utilisé Kettle pour convertir les données boursières obtenues via des tâches planifiées ; demander l’heure spécifiée par l’entreprise. Incrémenter les données et effectuer des transformations de données. Lors de l'obtention et de la conversion des données, nous devons enregistrer les informations de chaque demande pour faciliter la traçabilité et le rapprochement des données !!!

2. Méthodes d'obtention des données

2.1 Méthode récursive

Lors de l'utilisation de la méthode récursive, les données sont requises Le montant est petit, sinon un débordement de pile ou un débordement de tas se produira !!! Et la méthode récursive est monothread, donc la vitesse de synchronisation sera très lente !!!

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 Méthode multithread

Puisque la méthode récursive est monothread, pensez à Comme les données sont énormes et peuvent facilement provoquer un débordement de mémoire, la méthode récursive est remplacée par le multi-threading, ce qui non seulement évite le débordement de mémoire, mais améliore également considérablement la vitesse

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

3. Comment faire !!! connectez les données incrémentielles

Incrément Les données doivent être écrites en tant que tâche planifiée, et l'annotation planifiée peut être utilisée, et les données incrémentielles doivent être stockées dans la bibliothèque cible et les données converties

!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer