Heim >Java >javaLernprogramm >So implementieren Sie Multithreading und die Synchronisierung großer Batch-Daten in Java

So implementieren Sie Multithreading und die Synchronisierung großer Batch-Daten in Java

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBnach vorne
2023-04-29 18:16:073015Durchsuche

Hintergrund

Ich bin vor Kurzem auf eine Funktion gestoßen. Es gab mehr als 3 Millionen Daten in zwei Monaten, und diese häufen sich danach immer noch an. Da die Daten zu Beginn alle in der MySQL-Tabelle gespeichert waren, müssen sie jetzt auf der Seite angezeigt werden und muss mit Daten aus einer anderen Tabelle verknüpft werden, und die Abfragebedingungen auf der Produktanforderungsseite enthalten bis zu 20 Bedingungen. Letztendlich ist diese Funktion so festgefahren, dass es im Grunde unmöglich ist, die Daten zu finden.

Abschließend planen wir, die Daten dieser beiden Tabellen gleichzeitig in MongoDB zu speichern, um die Abfrageeffizienz zu verbessern.

Bei der Synchronisierung zu Beginn wurde ein einzelner Thread verwendet, um die Daten der beiden Tabellen im Paging-Modus zu synchronisieren. Das Ergebnis war ... nur 300.000 Daten wurden in einer Nacht synchronisiert, was extrem langsam war! ! !

Nach einigen Modifikationen wurden schließlich mehr als 3 Millionen Daten in 2 Stunden erfolgreich synchronisiert.

Das Folgende ist die Hauptlogik.

Bitte stellen Sie die Anzahl der Threads entsprechend Ihrer eigenen Serverleistung ein.

Idee

Ermitteln Sie zunächst die Gesamtzahl der Ergebnisse in der Ergebnismenge durch Zählen, legen Sie die Anzahl der Paging-Abfragen für jeden Thread fest, ermitteln Sie die Anzahl der Threads durch die Gesamtzahl und die einzelne Zahl und implementieren Sie die Batch-Abfrage, indem Sie ändern Index von limit .

Code-Implementierung

package com.github.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
 * 多线程同步历史数据
 * @author songfayuan
 * @date 2019-09-26 15:38
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    /**
     * 多线程同步通话记录历史数据
     * @param params
     * @return
     * @throws Exception
     */
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map<String, Object> params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
                }
            }
        });
        return Response.success("请求成功");
    }

    /**
     * 处理数据逻辑
     * @param params
     * @throws Exception
     */
    private void logicHandler(Map<String, Object> params) throws Exception {
        /******返回结果:多线程处理完的最终数据******/
        List<DuyanCallRecordDetail> result = new ArrayList<>();

        /******查询数据库总的数据条数******/
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");

//        int count = 2620266;
        /******限制每次查询的条数******/
        int num = 1000;

        /******计算需要查询的次数******/
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        /******每个线程开始查询的行数******/
        int offset = 0;

        /******添加任务******/
        List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
        List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
        for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
//                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
                    /******处理线程返回结果******/
                    if (futures != null && futures.size() > 0) {
                        for (Future<List<DuyanCallRecordDetail>> future : futures) {
                            List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        /******异步存储******/
                                        log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任务拆分执行异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
                }
            }
        }
    }

    /**
     * 数据存储MongoDB
     * @param duyanCallRecordDetailList
     */
    private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            /******重复数据不同步MongoDB******/
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            /******关联填写的记录******/
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //补充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //补充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}


class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
    /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
    private DuyanCallRecordDetailService myService;
    /******查询条件 根据条件来定义该类的属性******/
    private Map<String, Object> params;

    /******分页index******/
    private int offset;
    /******数量******/
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List<DuyanCallRecordDetail> call() throws Exception {
        /******通过service查询得到对应结果******/
        List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils-Tool

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * 描述:List工具类
 * @author songfayuan
 * 2018年7月22日下午2:23:22
 */
@Slf4j
public class ListUtils {
    
    /**
     * 描述:list集合深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年7月22日下午2:35:23
     */
    public static <T> List<T> deepCopy(List<T> src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List<T> dest = (List<T>) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 描述:对象深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    /**
     * 将一个list均分成n个list,主要通过偏移量来实现的
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * List按指定长度分割
     * @param list the list to return consecutive sublists of (需要分隔的list)
     * @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
     * @author songfayuan
     * @date 2019-07-07 21:37
     */
    public static <T> List<List<T>> partition(List<T> list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        List<Integer> bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList长度为:{}", bigList.size());
        log.info("bigList为:{}", bigList);
        List<List<Integer>> smallists = partition(bigList, 20);
        log.info("smallists长度为:{}", smallists.size());
        for (List<Integer> smallist : smallists) {
            log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
        }
    }

}

Das obige ist der detaillierte Inhalt vonSo implementieren Sie Multithreading und die Synchronisierung großer Batch-Daten in Java. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen