Home  >  Article  >  Java  >  How to achieve data synchronization based on Java

How to achieve data synchronization based on Java

王林
王林forward
2023-04-28 11:31:062091browse

Business Background

Setting up scheduled tasks in the new system requires real-time synchronization of data in the customer system to maintain data consistency.

Implementation logic

1. According to the interface provided by the customer, this system uses the Http Post request method to obtain the interface data.
2. Since the interface provided by the customer must have page number and page capacity, it will involve multiple requests to the interface to get the full amount of data, so the same operation can be performed recursively.
3. Each time you request the interface, you can obtain multiple pieces of data according to the page capacity (pageSize). At this time, you can add databases in batches and use batch SQL to add statements.
4. Since data synchronization needs to maintain the consistency of data between the two systems, it is necessary to use scheduled tasks and specify the synchronization frequency, for example: once a day or twice a day.
5. The use of scheduled tasks will cause the problem of data duplication, so establish a unique index based on a unique field to avoid the problem of repeated addition of data.
6. The establishment of a unique index can avoid the problem of repeated addition of the same record, but it cannot avoid the problem of data changes in other fields of the same record except the unique index field. Therefore, using replace into to add a SQL statement can solve this problem.

Tips: a. If it is found that this row of data already exists in the table (judging based on the primary key or unique index), delete this row of data first, and then insert new data. b. Otherwise, insert new data directly.

Use technology

1. Set up scheduled tasks.
2. Use Http's Post method to obtain interface data.
3. When multiple pages of data are involved, it is called recursively.
4. Batch operation database (replace into).
5. Create a unique index to avoid repeated data insertion.

Code details

1.StudentMapper.java

/**
     * 批量添加数据同步接口学生数据
     * @param studentList
     * @return
     */
    int addBatchStudent(@Param(value = "studentList") List<Student> studentList);

2.SyncStudentServiceImpl.java code is as follows:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import ***.common.utils.HttpUtils;
import ***.common.utils.StringUtils;
import ***.entity.sync.Student;
import ***.mapper.sync.StudentMapper;
import ***.service.sync.SyncStudentService;
import ***.vo.StudentVO;
import lombok.extern.slf4j.Slf4j;

/**
 * 数据同步的学生实现类
 * @author hc
 * @create 2021/03/25 11:20
 * @version 1.0
 * @since 1.0
 */
@Service
@Slf4j
public class SyncStudentServiceImpl implements SyncStudentService {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private HttpUtils httpUtils;

    @Override
    public void bulkAddStudent(StudentVO studentVO) {
        log.info("调取学生接口传的参数对象studentVO:"+studentVO);
        log.info("开始调取学生接口获取第" + studentVO.getPageIndex() + "页数据");
        //如何页面容量小于100,则按100计算
        if(studentVO.getPageSize() < 100) {
            studentVO.setPageSize(100);
        }
        //根据当前页码和页面容量调取获取学生数据接口
        JSONObject jsonObject = this.sendStudentHttpPost(studentVO);
        //判断返回JSONObject是否为null
        if (StringUtils.isNotNull(jsonObject) && jsonObject.containsKey("errcode") && jsonObject.getInteger("errcode") == 0) {
            if(jsonObject.containsKey("data")){
                JSONArray jsonArray = jsonObject.getJSONArray("data");
                //通过判断获取的jsonObject对象中key值为data是否为null和其 jsonArray的长度来判断是否进行递归
                //提示:此判断方法好于通过计算总页码的方式来递归拿数据(对获取的total依赖过大,因此放弃此方式)
                if(jsonObject.getString("data") != null && jsonArray.size() > 0) {
                   log.info("当前数据加载到几页》》》》:{}", studentVO.getPageIndex());
                   //调取批量添加数据
                   this.addStudentCycleData(jsonObject, studentVO);
                   //页码加1,继续调取下一页数据
                      studentVO.setPageIndex(studentVO.getPageIndex() + 1);
                     //采用递归方式直至循环结束
                   this.bulkAddStudent(studentVO);
                }else {
                    log.info("学生数据同步结束》》》");
                }
            }
        }
    }

    /**
     * 批量添加学生数据
     * @param jsonObject
     * @param areaVO
     * @return
     */
    public void addStudentCycleData(JSONObject jsonObject, StudentVO studentVO){
        List<Student> studentList = null;
        //判断jsonArray时候为空
        if (jsonObject != null && StringUtils.isNotBlank(jsonObject.getString("data"))) {
            //把JsonArray转成对应实体类集合
            studentList = JSONObject.parseArray(jsonObject.getString("data"), Student.class);
        }
        try {
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据开始入库...");
            //调取方法批量进行添加学生数据
            studentMapper.addBatchStudent(studentList);
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据入库成功...");
        } catch (Exception e) {
            log.error("学生批量添加数据库异常:{}", e.getMessage());
        }
    }

    /**
     * 根据studentVO(当前页码和页面容量)发送获取学生数据的请求
     * @param studentVO
     * @return
     */
    public JSONObject sendStudentHttpPost(StudentVO studentVO){
        JSONObject jsonObject = null;
        String studentUrl = "http://*****/async-api/jc/student";
        try {
            if (StringUtils.isNotEmpty(studentUrl)) {
                Map<String, Object> param = new HashMap<>();
                param.put("pageIndex", studentVO.getPageIndex());
                param.put("pageSize", studentVO.getPageSize());
                log.info("开始发起http请求...");
                jsonObject = httpUtils.sendHttpPost(param, studentUrl);
            }
        } catch (Exception e) {
            log.error("调取客户学生同步接口出现异常:{},页面容量为:{},页码:{}", e.getMessage(), 
            studentVO.getPageSize(), studentVO.getPageIndex());
        }
        return jsonObject;
    }
}

3.StudentVO.java code is as follows:

import lombok.Data;
/**
 * 数据同步接口获取学生数据传的参数VO类
 * @author hc
 * @create 2021/3/11 10:35
 * @version 1.0
 * @since 1.0
 */
@Data
public class StudentVO{
    //当前页码(初始值为0)
    private Integer pageIndex = 0;
    //页码容量
    private Integer pageSize;
}

4.HttpUtils.java code is as follows:

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.util.Map;

/**
 * Http请求工具类
 * @author hc
 * @create 2021/3/4
 * @version 1.0
 */
@Component
@Slf4j
public class HttpUtils {
    @Autowired
    private RestTemplate restTemplate;
    
    /**
     * 发送http的post请求方法
     * @param param
     * @return
     */
    public JSONObject sendHttpPost(Integer type, Map<String, Object> param, String url){
        log.info("调取同步接口Url:{}", url);
        JSONObject jsonObject = null;
        //发起http的post准备工作
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add("Content-Type", "application/json");
        HttpEntity<Map<String, Object>> httpEntity = new HttpEntity<>(param, httpHeaders);
        ResponseEntity<String> response = null;
        try {
            log.info("param参数为:{}",param.toString());
            response = restTemplate.postForEntity(url, httpEntity, String.class);
        } catch (HttpClientErrorException e) {
            log.error("发起http请求报错信息:{}",e.getResponseBodyAsString());
        }
        String bodyData = response.getBody();
        if (StringUtils.isNotEmpty(bodyData)) {
            jsonObject = JSONObject.parseObject(bodyData);
        }
        return jsonObject;
    }
}

5.The SQL statement of StudentMapper.xml is as follows:

<!-- 批量添加数据同步接口中获取的学生数据 -->
<insert id="addBatchStudent" parameterType="***.entity.sync.Student">
    replace into xf_clue_sync_student(id, student_code, student_name, status, create_date, update_date)
    <foreach collection="studentList" item="student" open="values" separator="," >
       (#{student.id,jdbcType=BIGINT}, #{student.studentCode,jdbcType=INTEGER}, #{student.studentName,jdbcType=VARCHAR}, 
        #{student.status,jdbcType=INTEGER}, #{student.createDate,jdbcType=VARCHAR}, #{student.updateDate,jdbcType=VARCHAR})
    </foreach>
</insert>

Function summary

1. Scheduled tasks The configuration-related code is no longer shown here. The SpringBoot framework uses annotations to set scheduled tasks and call frequency.
2. The development of data synchronization interface requires different methods according to specific application scenarios, which depends on the situation. For example, you can also use the kettle tool, etc.

The above is the detailed content of How to achieve data synchronization based on Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete