ホームページ >Java >&#&チュートリアル >Java に基づいてデータ同期を実現する方法

Java に基づいてデータ同期を実現する方法

王林
王林転載
2023-04-28 11:31:062196ブラウズ

ビジネスの背景

新しいシステムでスケジュールされたタスクを設定するには、データの一貫性を維持するために顧客システム内のデータをリアルタイムで同期する必要があります。

実装ロジック

1. お客様が提供するインターフェースに応じて、このシステムは Http Post リクエストメソッドを使用してインターフェースデータを取得します。
2. 顧客が提供するインターフェースにはページ番号とページ容量が必要であるため、全量のデータを取得するにはインターフェースへの複数のリクエストが必要となり、同じ操作を再帰的に実行できます。
3. インターフェースをリクエストするたびに、ページ容量 (pageSize) に応じて複数のデータを取得できますが、その際にデータベースを一括で追加したり、バッチ SQL を使用してステートメントを追加したりすることができます。
4. データ同期では 2 つのシステム間のデータの整合性を維持する必要があるため、スケジュールされたタスクを使用し、同期の頻度を指定する必要があります (たとえば、1 日に 1 回、または 1 日に 2 回)。
5. スケジュールされたタスクを使用すると、データの重複の問題が発生するため、データが繰り返し追加される問題を回避するために、一意のフィールドに基づいて一意のインデックスを確立します。
6. 一意のインデックスを作成すると、同じレコードが繰り返し追加される問題は回避できますが、同じレコード内の一意のインデックス フィールドを除く他のフィールドのデータが変更される問題は回避できません。 SQL ステートメントを追加すると、この問題を解決できます。

ヒント: a. このデータ行が既にテーブルに存在することが判明した場合 (主キーまたは一意のインデックスに基づいて判断)、まずこのデータ行を削除してから、新しいデータを挿入します。 b. それ以外の場合は、新しいデータを直接挿入します。

テクノロジーを使用する

1. スケジュールされたタスクを設定します。
2. Http の Post メソッドを使用してインターフェイス データを取得します。
3. 複数ページのデータが関係する場合、再帰的に呼び出されます。
4. バッチ操作データベース (に置き換えます)。
5. データの繰り返し挿入を避けるために、一意のインデックスを作成します。

コードの詳細

1.StudentMapper.java

/**
     * 批量添加数据同步接口学生数据
     * @param studentList
     * @return
     */
    int addBatchStudent(@Param(value = "studentList") List<Student> studentList);

2.SyncStudentServiceImpl.java コードは次のとおりです:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import ***.common.utils.HttpUtils;
import ***.common.utils.StringUtils;
import ***.entity.sync.Student;
import ***.mapper.sync.StudentMapper;
import ***.service.sync.SyncStudentService;
import ***.vo.StudentVO;
import lombok.extern.slf4j.Slf4j;

/**
 * 数据同步的学生实现类
 * @author hc
 * @create 2021/03/25 11:20
 * @version 1.0
 * @since 1.0
 */
@Service
@Slf4j
public class SyncStudentServiceImpl implements SyncStudentService {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private HttpUtils httpUtils;

    @Override
    public void bulkAddStudent(StudentVO studentVO) {
        log.info("调取学生接口传的参数对象studentVO:"+studentVO);
        log.info("开始调取学生接口获取第" + studentVO.getPageIndex() + "页数据");
        //如何页面容量小于100,则按100计算
        if(studentVO.getPageSize() < 100) {
            studentVO.setPageSize(100);
        }
        //根据当前页码和页面容量调取获取学生数据接口
        JSONObject jsonObject = this.sendStudentHttpPost(studentVO);
        //判断返回JSONObject是否为null
        if (StringUtils.isNotNull(jsonObject) && jsonObject.containsKey("errcode") && jsonObject.getInteger("errcode") == 0) {
            if(jsonObject.containsKey("data")){
                JSONArray jsonArray = jsonObject.getJSONArray("data");
                //通过判断获取的jsonObject对象中key值为data是否为null和其 jsonArray的长度来判断是否进行递归
                //提示:此判断方法好于通过计算总页码的方式来递归拿数据(对获取的total依赖过大,因此放弃此方式)
                if(jsonObject.getString("data") != null && jsonArray.size() > 0) {
                   log.info("当前数据加载到几页》》》》:{}", studentVO.getPageIndex());
                   //调取批量添加数据
                   this.addStudentCycleData(jsonObject, studentVO);
                   //页码加1,继续调取下一页数据
                      studentVO.setPageIndex(studentVO.getPageIndex() + 1);
                     //采用递归方式直至循环结束
                   this.bulkAddStudent(studentVO);
                }else {
                    log.info("学生数据同步结束》》》");
                }
            }
        }
    }

    /**
     * 批量添加学生数据
     * @param jsonObject
     * @param areaVO
     * @return
     */
    public void addStudentCycleData(JSONObject jsonObject, StudentVO studentVO){
        List<Student> studentList = null;
        //判断jsonArray时候为空
        if (jsonObject != null && StringUtils.isNotBlank(jsonObject.getString("data"))) {
            //把JsonArray转成对应实体类集合
            studentList = JSONObject.parseArray(jsonObject.getString("data"), Student.class);
        }
        try {
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据开始入库...");
            //调取方法批量进行添加学生数据
            studentMapper.addBatchStudent(studentList);
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据入库成功...");
        } catch (Exception e) {
            log.error("学生批量添加数据库异常:{}", e.getMessage());
        }
    }

    /**
     * 根据studentVO(当前页码和页面容量)发送获取学生数据的请求
     * @param studentVO
     * @return
     */
    public JSONObject sendStudentHttpPost(StudentVO studentVO){
        JSONObject jsonObject = null;
        String studentUrl = "http://*****/async-api/jc/student";
        try {
            if (StringUtils.isNotEmpty(studentUrl)) {
                Map<String, Object> param = new HashMap<>();
                param.put("pageIndex", studentVO.getPageIndex());
                param.put("pageSize", studentVO.getPageSize());
                log.info("开始发起http请求...");
                jsonObject = httpUtils.sendHttpPost(param, studentUrl);
            }
        } catch (Exception e) {
            log.error("调取客户学生同步接口出现异常:{},页面容量为:{},页码:{}", e.getMessage(), 
            studentVO.getPageSize(), studentVO.getPageIndex());
        }
        return jsonObject;
    }
}

3.StudentVO.java コードは次のとおりです。次のように:

import lombok.Data;
/**
 * 数据同步接口获取学生数据传的参数VO类
 * @author hc
 * @create 2021/3/11 10:35
 * @version 1.0
 * @since 1.0
 */
@Data
public class StudentVO{
    //当前页码(初始值为0)
    private Integer pageIndex = 0;
    //页码容量
    private Integer pageSize;
}

4.HttpUtils.java コードは次のとおりです:

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.util.Map;

/**
 * Http请求工具类
 * @author hc
 * @create 2021/3/4
 * @version 1.0
 */
@Component
@Slf4j
public class HttpUtils {
    @Autowired
    private RestTemplate restTemplate;
    
    /**
     * 发送http的post请求方法
     * @param param
     * @return
     */
    public JSONObject sendHttpPost(Integer type, Map<String, Object> param, String url){
        log.info("调取同步接口Url:{}", url);
        JSONObject jsonObject = null;
        //发起http的post准备工作
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add("Content-Type", "application/json");
        HttpEntity<Map<String, Object>> httpEntity = new HttpEntity<>(param, httpHeaders);
        ResponseEntity<String> response = null;
        try {
            log.info("param参数为:{}",param.toString());
            response = restTemplate.postForEntity(url, httpEntity, String.class);
        } catch (HttpClientErrorException e) {
            log.error("发起http请求报错信息:{}",e.getResponseBodyAsString());
        }
        String bodyData = response.getBody();
        if (StringUtils.isNotEmpty(bodyData)) {
            jsonObject = JSONObject.parseObject(bodyData);
        }
        return jsonObject;
    }
}

5.StudentMapper.xml の SQL ステートメントは次のとおりです:

<!-- 批量添加数据同步接口中获取的学生数据 -->
<insert id="addBatchStudent" parameterType="***.entity.sync.Student">
    replace into xf_clue_sync_student(id, student_code, student_name, status, create_date, update_date)
    <foreach collection="studentList" item="student" open="values" separator="," >
       (#{student.id,jdbcType=BIGINT}, #{student.studentCode,jdbcType=INTEGER}, #{student.studentName,jdbcType=VARCHAR}, 
        #{student.status,jdbcType=INTEGER}, #{student.createDate,jdbcType=VARCHAR}, #{student.updateDate,jdbcType=VARCHAR})
    </foreach>
</insert>

関数の概要

1. スケジュールされたタスク 構成関連のコードはここには表示されなくなりましたが、SpringBoot フレームワークはアノテーションを使用してスケジュールされたタスクと呼び出し頻度を設定します。
2. データ同期インターフェイスの開発には、特定のアプリケーション シナリオに応じて、状況に応じてさまざまな方法が必要です。たとえば、ケトル ツールなどを使用することもできます。

以上がJava に基づいてデータ同期を実現する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。