>  기사  >  Java  >  SpringBoot 예약 작업을 사용하여 데이터 동기화를 달성하는 방법

SpringBoot 예약 작업을 사용하여 데이터 동기화를 달성하는 방법

PHPz
PHPz앞으로
2023-05-10 20:40:041952검색

머리말

비즈니스 요구 사항은 중간 플랫폼에서 API 인터페이스를 호출하여 장치 데이터를 얻는 것이며, 이를 위해서는 실제 장치 데이터의 동기화가 필요합니다.

옵션 1: 폴링 인터페이스를 통해 pullData() 메서드를 실행하여 데이터 동기화를 달성합니다.

이 메서드의 원리는 먼저 이전 데이터를 모두 지운 다음 API 호출을 통해 얻은 최신 데이터를 다시 삽입하는 것입니다. 이 방법의 장점은 논리가 간단하다는 것입니다. 단점은 데이터가 자주 삭제되고 삽입된다는 점입니다. 쿼리 데이터를 다시 호출하면 어느 순간 모든 데이터가 삭제되는데 제때 삽입되지 않은 경우가 있습니다. 데이터에 이상이 있을 수 있습니다.

옵션 2: 폴링 인터페이스를 통해 pullDataNew() 메서드를 실행하여 데이터 동기화를 달성합니다.

이 방법의 원리는 먼저 데이터베이스에서 기존 데이터를 쿼리한 다음 API를 통해 얻은 최신 데이터와 비교하는 것입니다. 데이터의 증가, 감소 및 변수를 추출하여 동기식으로 업데이트합니다. 이 방법의 장점은 데이터베이스에 대한 빈번한 작업을 줄이고 성능을 향상시키는 것입니다. 단점: 뚜렷한 단점이 발견되지 않습니다.

package com.hxtx.spacedata.task;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.api.client.util.Lists;
import com.hxtx.spacedata.common.domain.ResponseDTO;
import com.hxtx.spacedata.config.SpringContextUtil;
import com.hxtx.spacedata.controller.file.FilesMinioController;
import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;
import com.hxtx.spacedata.service.entityconfig.EntityPointService;
import com.hxtx.spacedata.util.HttpProxyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
 
 
/**
 * 中台设备数据 定时任务执行
 *
 * @author Tarzan Liu
 * @version 1.0.0
 * @description
 * @date 2020/12/07
 */
@Component
@Slf4j
public class EntityPointTask {
 
    @Autowired
    private EntityPointService entityPointService;
 
    @Value("${middleGround.server.host}")
    private String host;
 
    @Value("${middleGround.server.port}")
    private String port;
 
    private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);
 
    /**
     * 设备定义点数据拉取
     *
     * @author tarzan Liu
     * @date 2020/12/2
     */
    @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次
    public void pullDataTaskByCorn() {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String systemId = obj.getString("id");
                    pullDataNew(systemId);
                }
            }
        }
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullData(String code) {
        List<EntityPointEntity> list = Lists.newArrayList();
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String pointId = obj.getString("pointId");
                    String name = obj.getString("name");
                    list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());
                }
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));
                if (CollectionUtils.isNotEmpty(existList)) {
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));
                    list.forEach(e -> {
                        String value = existMap.get(e.getPointId());
                        if (value != null) {
                            e.setValue(value);
                        }
                    });
                }
                entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                entityPointService.saveBatch(list);
            }
        }
        return ResponseDTO.succ();
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullDataNew(String code) {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray data = jsonObject.getJSONArray("data");
            List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);
            if (CollectionUtils.isNotEmpty(list)) {
                list.forEach(e -> e.setCode(code));
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                if (CollectionUtils.isNotEmpty(existList)) {
                    //存在map
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //传输map
                    Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //增量
                    List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(increment)) {
                        entityPointService.saveBatch(increment);
                    }
                    //减量
                    List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(decrement)) {
                        entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));
                    }
                    //变量
                    List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(variable)) {
                        variable.forEach(e -> {
                            e.setName(dataMap.get(e.getPointId()));
                        });
                        entityPointService.updateBatchById(variable);
                    }
                } else {
                    entityPointService.saveBatch(list);
                }
            }
        }
        return ResponseDTO.succ();
    }
 
 
}

데이터베이스 해당 엔터티 클래스

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.io.Serializable;
import java.util.Date;
 
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName(value = "t_entity_point")
public class EntityPointEntity implements Serializable {
 
    private static final long serialVersionUID = 2181036545424452651L;
 
    /**
     * 定义点id
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
 
    /**
     * 定义点id
     */
    private String pointId;
 
    /**
     * 名称
     */
    private String name;
 
    /**
     * 绘制数据
     */
    private String value;
 
    /**
     * 编码
     */
    private String code;
 
    /**
     * 创建时间
     */
    private Date createTime;
 
}

HTTP 요청 프록시 도구 클래스

import lombok.extern.slf4j.Slf4j;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
 
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
 
/**
 * HTTP请求代理类
 *
 * @author tarzan Liu
 * @description 发送Get Post请求
 */
@Slf4j
public class HttpProxyUtil {
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @return
     */
    public static String sendGet(String api_url) {
        return sendGet(api_url, "", "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @param param
     * @return
     */
    public static String sendGet(String api_url, String param) {
        return sendGet(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空
     * @param charset 字符集
     * @return
     */
    public static String sendGet(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            // 判断有无参数,若是拼接好的url,就不必再拼接了
            if (param != null && !"".equals(param)) {
                api_url = api_url + "?" + param;
            }
            log.info("请求的路径是:" + api_url);
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
            conn.connect();    // 建立实际的联接
 
            // 定义 BufferedReader输入流来读取URL的相应
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送GET请求出现异常! " + e.getMessage());
            return null;
        }
        //  log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @return
     */
    public static String sendPost(String api_url, String param) {
        return sendPost(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @param charset 字符集
     * @return
     */
    public static String sendPost(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            log.info("请求的路径是:" + api_url + ",参数是:" + param);
 
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
 
            // 发送POST请求必须设置如下两行
            conn.setDoOutput(true);
            conn.setDoInput(true);
 
            // 获取URLConnection对象对应的输出流
            try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
                out.print(param); // 发送请求参数
                out.flush();// flush输出流的缓冲
            }
            // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送POST请求出现异常! " + e.getMessage());
            e.printStackTrace();
        }
        log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
    public static CloseableHttpClient createSSLClientDefault() throws Exception {
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();
        SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);
        return HttpClients.custom().setSSLSocketFactory(sslSf).build();
    }
 
    // 加载证书
    private static class AllTrustStrategy implements TrustStrategy {
        public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
            return true;
        }
    }
 
    /**
     * 支持https请求
     *
     * @param url
     * @param param
     * @return
     * @throws Exception
     */
    public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {
        CloseableHttpClient httpClient = createSSLClientDefault();
        HttpPost httpPost = null;
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 发起HTTP的POST请求
            httpPost = new HttpPost(url);
            List<NameValuePair> paramList = new ArrayList<NameValuePair>();
            for (String key : param.keySet()) {
                paramList.add(new BasicNameValuePair(key, param.get(key)));
            }
            log.info("http请求地址:" + url + ",参数:" + paramList.toString());
            // UTF8+URL编码
            httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());
            response = httpClient.execute(httpPost);
            HttpEntity entity = response.getEntity();
            int statusCode = response.getStatusLine().getStatusCode();
            if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200
 
            }
            result = EntityUtils.toString(entity);
            log.info("状态码:" + statusCode + ",响应信息:" + result);
        } finally {
            if (response != null) {
                response.close();
            }
            if (httpPost != null) {
                httpPost.releaseConnection();
            }
            httpClient.close();
        }
        return result;
    }
}

위 내용은 SpringBoot 예약 작업을 사용하여 데이터 동기화를 달성하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제