首頁  >  文章  >  Java  >  Java如何實作多執行緒大批量同步數據

Java如何實作多執行緒大批量同步數據

WBOY
WBOY轉載
2023-04-29 18:16:072931瀏覽

背景

最近遇到個功能,兩個月有300w 的數據,之後還在累加,因一開始該數據就全部存儲在mysql表,現需要展示在頁面,還需要關聯另一張表的數據,而且產品要求頁面的查詢條件多達20個條件,最終,這個功能卡的要死,基本上查不出來數據。

最後是打算把這兩張表的資料同時儲存到MongoDB中去,以提高查詢效率。

一開始同步的時候,採用單線程,循環以分頁的模式去同步這兩張表數據,結果是…一晚上,只同步了30w數據,特慢! ! !

最後,改造了一番,2小時,就成功同步了300w 資料。

以下是主要邏輯。

線程的個數請依照你自己的伺服器效能酌情設定。

想法

先透過count查出結果集的總條數,設定每個執行緒分頁查詢的條數,透過總條數和單次條數得到執行緒數量,透過改變limit的下標實作分批查詢。

程式碼實作

package com.github.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
 * 多线程同步历史数据
 * @author songfayuan
 * @date 2019-09-26 15:38
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    /**
     * 多线程同步通话记录历史数据
     * @param params
     * @return
     * @throws Exception
     */
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map<String, Object> params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
                }
            }
        });
        return Response.success("请求成功");
    }

    /**
     * 处理数据逻辑
     * @param params
     * @throws Exception
     */
    private void logicHandler(Map<String, Object> params) throws Exception {
        /******返回结果:多线程处理完的最终数据******/
        List<DuyanCallRecordDetail> result = new ArrayList<>();

        /******查询数据库总的数据条数******/
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");

//        int count = 2620266;
        /******限制每次查询的条数******/
        int num = 1000;

        /******计算需要查询的次数******/
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        /******每个线程开始查询的行数******/
        int offset = 0;

        /******添加任务******/
        List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
        List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
        for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
//                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
                    /******处理线程返回结果******/
                    if (futures != null && futures.size() > 0) {
                        for (Future<List<DuyanCallRecordDetail>> future : futures) {
                            List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        /******异步存储******/
                                        log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任务拆分执行异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
                }
            }
        }
    }

    /**
     * 数据存储MongoDB
     * @param duyanCallRecordDetailList
     */
    private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            /******重复数据不同步MongoDB******/
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            /******关联填写的记录******/
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //补充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //补充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}


class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
    /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
    private DuyanCallRecordDetailService myService;
    /******查询条件 根据条件来定义该类的属性******/
    private Map<String, Object> params;

    /******分页index******/
    private int offset;
    /******数量******/
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List<DuyanCallRecordDetail> call() throws Exception {
        /******通过service查询得到对应结果******/
        List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils工具

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * 描述:List工具类
 * @author songfayuan
 * 2018年7月22日下午2:23:22
 */
@Slf4j
public class ListUtils {
    
    /**
     * 描述:list集合深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年7月22日下午2:35:23
     */
    public static <T> List<T> deepCopy(List<T> src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List<T> dest = (List<T>) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 描述:对象深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    /**
     * 将一个list均分成n个list,主要通过偏移量来实现的
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * List按指定长度分割
     * @param list the list to return consecutive sublists of (需要分隔的list)
     * @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
     * @author songfayuan
     * @date 2019-07-07 21:37
     */
    public static <T> List<List<T>> partition(List<T> list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        List<Integer> bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList长度为:{}", bigList.size());
        log.info("bigList为:{}", bigList);
        List<List<Integer>> smallists = partition(bigList, 20);
        log.info("smallists长度为:{}", smallists.size());
        for (List<Integer> smallist : smallists) {
            log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
        }
    }

}

以上是Java如何實作多執行緒大批量同步數據的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除