Home  >  Article  >  Java  >  How to use SpringBoot scheduled tasks to achieve data synchronization

How to use SpringBoot scheduled tasks to achieve data synchronization

PHPz
PHPzforward
2023-05-10 20:40:041951browse

Preface

The business requirement is to obtain device data by calling the API interface in the middle platform, which requires the synchronization of actual device data.

Option 1: Execute the pullData() method through the polling interface to achieve data synchronization

The principle of this method is to first clear all previous data and then reinsert it Latest data obtained via api call. The advantage of this method is that the logic is simple. The disadvantage is that data is frequently deleted and inserted. When calling the query data again, at a certain moment, all the data will be deleted, but it has not been inserted in time. There may be anomalies in the data.

Option 2: Execute the pullDataNew() method through the polling interface to achieve data synchronization

The principle of this method is to first query the database, existing data, and then synchronize it with Compare the latest data obtained through API calls, find out the increments, decrements and variables in the data, and update them synchronously. The advantage of this method is to reduce frequent operations on the database and improve performance. Disadvantages: No obvious shortcomings found.

package com.hxtx.spacedata.task;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.api.client.util.Lists;
import com.hxtx.spacedata.common.domain.ResponseDTO;
import com.hxtx.spacedata.config.SpringContextUtil;
import com.hxtx.spacedata.controller.file.FilesMinioController;
import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;
import com.hxtx.spacedata.service.entityconfig.EntityPointService;
import com.hxtx.spacedata.util.HttpProxyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
 
 
/**
 * 中台设备数据 定时任务执行
 *
 * @author Tarzan Liu
 * @version 1.0.0
 * @description
 * @date 2020/12/07
 */
@Component
@Slf4j
public class EntityPointTask {
 
    @Autowired
    private EntityPointService entityPointService;
 
    @Value("${middleGround.server.host}")
    private String host;
 
    @Value("${middleGround.server.port}")
    private String port;
 
    private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);
 
    /**
     * 设备定义点数据拉取
     *
     * @author tarzan Liu
     * @date 2020/12/2
     */
    @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次
    public void pullDataTaskByCorn() {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String systemId = obj.getString("id");
                    pullDataNew(systemId);
                }
            }
        }
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullData(String code) {
        List<EntityPointEntity> list = Lists.newArrayList();
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String pointId = obj.getString("pointId");
                    String name = obj.getString("name");
                    list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());
                }
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));
                if (CollectionUtils.isNotEmpty(existList)) {
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));
                    list.forEach(e -> {
                        String value = existMap.get(e.getPointId());
                        if (value != null) {
                            e.setValue(value);
                        }
                    });
                }
                entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                entityPointService.saveBatch(list);
            }
        }
        return ResponseDTO.succ();
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullDataNew(String code) {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray data = jsonObject.getJSONArray("data");
            List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);
            if (CollectionUtils.isNotEmpty(list)) {
                list.forEach(e -> e.setCode(code));
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                if (CollectionUtils.isNotEmpty(existList)) {
                    //存在map
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //传输map
                    Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //增量
                    List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(increment)) {
                        entityPointService.saveBatch(increment);
                    }
                    //减量
                    List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(decrement)) {
                        entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));
                    }
                    //变量
                    List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(variable)) {
                        variable.forEach(e -> {
                            e.setName(dataMap.get(e.getPointId()));
                        });
                        entityPointService.updateBatchById(variable);
                    }
                } else {
                    entityPointService.saveBatch(list);
                }
            }
        }
        return ResponseDTO.succ();
    }
 
 
}

Database corresponding entity class

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.io.Serializable;
import java.util.Date;
 
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName(value = "t_entity_point")
public class EntityPointEntity implements Serializable {
 
    private static final long serialVersionUID = 2181036545424452651L;
 
    /**
     * 定义点id
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
 
    /**
     * 定义点id
     */
    private String pointId;
 
    /**
     * 名称
     */
    private String name;
 
    /**
     * 绘制数据
     */
    private String value;
 
    /**
     * 编码
     */
    private String code;
 
    /**
     * 创建时间
     */
    private Date createTime;
 
}

HTTP request proxy tool class

import lombok.extern.slf4j.Slf4j;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
 
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
 
/**
 * HTTP请求代理类
 *
 * @author tarzan Liu
 * @description 发送Get Post请求
 */
@Slf4j
public class HttpProxyUtil {
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @return
     */
    public static String sendGet(String api_url) {
        return sendGet(api_url, "", "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @param param
     * @return
     */
    public static String sendGet(String api_url, String param) {
        return sendGet(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空
     * @param charset 字符集
     * @return
     */
    public static String sendGet(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            // 判断有无参数,若是拼接好的url,就不必再拼接了
            if (param != null && !"".equals(param)) {
                api_url = api_url + "?" + param;
            }
            log.info("请求的路径是:" + api_url);
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
            conn.connect();    // 建立实际的联接
 
            // 定义 BufferedReader输入流来读取URL的相应
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送GET请求出现异常! " + e.getMessage());
            return null;
        }
        //  log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @return
     */
    public static String sendPost(String api_url, String param) {
        return sendPost(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @param charset 字符集
     * @return
     */
    public static String sendPost(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            log.info("请求的路径是:" + api_url + ",参数是:" + param);
 
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
 
            // 发送POST请求必须设置如下两行
            conn.setDoOutput(true);
            conn.setDoInput(true);
 
            // 获取URLConnection对象对应的输出流
            try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
                out.print(param); // 发送请求参数
                out.flush();// flush输出流的缓冲
            }
            // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送POST请求出现异常! " + e.getMessage());
            e.printStackTrace();
        }
        log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
    public static CloseableHttpClient createSSLClientDefault() throws Exception {
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();
        SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);
        return HttpClients.custom().setSSLSocketFactory(sslSf).build();
    }
 
    // 加载证书
    private static class AllTrustStrategy implements TrustStrategy {
        public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
            return true;
        }
    }
 
    /**
     * 支持https请求
     *
     * @param url
     * @param param
     * @return
     * @throws Exception
     */
    public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {
        CloseableHttpClient httpClient = createSSLClientDefault();
        HttpPost httpPost = null;
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 发起HTTP的POST请求
            httpPost = new HttpPost(url);
            List<NameValuePair> paramList = new ArrayList<NameValuePair>();
            for (String key : param.keySet()) {
                paramList.add(new BasicNameValuePair(key, param.get(key)));
            }
            log.info("http请求地址:" + url + ",参数:" + paramList.toString());
            // UTF8+URL编码
            httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());
            response = httpClient.execute(httpPost);
            HttpEntity entity = response.getEntity();
            int statusCode = response.getStatusLine().getStatusCode();
            if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200
 
            }
            result = EntityUtils.toString(entity);
            log.info("状态码:" + statusCode + ",响应信息:" + result);
        } finally {
            if (response != null) {
                response.close();
            }
            if (httpPost != null) {
                httpPost.releaseConnection();
            }
            httpClient.close();
        }
        return result;
    }
}

The above is the detailed content of How to use SpringBoot scheduled tasks to achieve data synchronization. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete