Rumah  >  Artikel  >  Java  >  Cara menggunakan Java multi-threading untuk mencapai penyegerakan data pihak ketiga

Cara menggunakan Java multi-threading untuk mencapai penyegerakan data pihak ketiga

WBOY
WBOYke hadapan
2023-04-29 12:25:061488semak imbas

1. Senario

Tugas pembangunan terkini adalah untuk menyegerakkan data pihak ketiga, dan data pihak ketiga secara amnya merangkumi data stok dan data tambahan, dan data stok ialah 1 juta+. Apabila mempelajari tentang keperluan ini, kami menjalankan sejumlah perolehan maklumat dan pembelajaran alat, memperoleh data stok terlebih dahulu ke pangkalan data sasaran, dan kemudian menggunakan cerek untuk menukar data stok diperoleh melalui tugas berjadual mengikut meminta masa yang ditentukan oleh pihak perniagaan Menaikkan data dan melakukan transformasi data. Apabila memperoleh dan menukar data, kami harus merekodkan setiap maklumat permintaan untuk memudahkan kebolehkesanan dan penyelarasan data!!!

2. Kaedah mendapatkan data

2.1 Kaedah rekursif

Apabila menggunakan kaedah rekursif, jumlah data yang diperlukan adalah kecil, jika tidak limpahan timbunan atau limpahan timbunan akan berlaku!!! Dan kaedah rekursif adalah satu benang, jadi kelajuan penyegerakan akan menjadi sangat perlahan!! !

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 Kaedah berbilang benang

Memandangkan kaedah rekursif adalah satu benang, memandangkan data yang besar dan mudah menyebabkan limpahan memori, menggantikan kaedah rekursif dengan kaedah berbilang benang bukan sahaja mengelakkan Situasi limpahan memori dihapuskan, dan kelajuannya dipertingkatkan dengan sangat baik!!!

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

3 Cara menyambungkan data tambahan

Data tambahan perlu menulis tugas berjadual. , anda boleh menggunakan anotasi Berjadual dan perlu Data tambahan disimpan dalam pangkalan data sasaran dan penukaran data dilakukan!

Atas ialah kandungan terperinci Cara menggunakan Java multi-threading untuk mencapai penyegerakan data pihak ketiga. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam