search
HomeJavajavaTutorialHow to use SpringBoot scheduled tasks to achieve data synchronization

Preface

The business requirement is to obtain device data by calling the API interface in the middle platform, which requires the synchronization of actual device data.

Option 1: Execute the pullData() method through the polling interface to achieve data synchronization

The principle of this method is to first clear all previous data and then reinsert it Latest data obtained via api call. The advantage of this method is that the logic is simple. The disadvantage is that data is frequently deleted and inserted. When calling the query data again, at a certain moment, all the data will be deleted, but it has not been inserted in time. There may be anomalies in the data.

Option 2: Execute the pullDataNew() method through the polling interface to achieve data synchronization

The principle of this method is to first query the database, existing data, and then synchronize it with Compare the latest data obtained through API calls, find out the increments, decrements and variables in the data, and update them synchronously. The advantage of this method is to reduce frequent operations on the database and improve performance. Disadvantages: No obvious shortcomings found.

package com.hxtx.spacedata.task;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.api.client.util.Lists;
import com.hxtx.spacedata.common.domain.ResponseDTO;
import com.hxtx.spacedata.config.SpringContextUtil;
import com.hxtx.spacedata.controller.file.FilesMinioController;
import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;
import com.hxtx.spacedata.service.entityconfig.EntityPointService;
import com.hxtx.spacedata.util.HttpProxyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
 
 
/**
 * 中台设备数据 定时任务执行
 *
 * @author Tarzan Liu
 * @version 1.0.0
 * @description
 * @date 2020/12/07
 */
@Component
@Slf4j
public class EntityPointTask {
 
    @Autowired
    private EntityPointService entityPointService;
 
    @Value("${middleGround.server.host}")
    private String host;
 
    @Value("${middleGround.server.port}")
    private String port;
 
    private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);
 
    /**
     * 设备定义点数据拉取
     *
     * @author tarzan Liu
     * @date 2020/12/2
     */
    @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次
    public void pullDataTaskByCorn() {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String systemId = obj.getString("id");
                    pullDataNew(systemId);
                }
            }
        }
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullData(String code) {
        List<EntityPointEntity> list = Lists.newArrayList();
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String pointId = obj.getString("pointId");
                    String name = obj.getString("name");
                    list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());
                }
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));
                if (CollectionUtils.isNotEmpty(existList)) {
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));
                    list.forEach(e -> {
                        String value = existMap.get(e.getPointId());
                        if (value != null) {
                            e.setValue(value);
                        }
                    });
                }
                entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                entityPointService.saveBatch(list);
            }
        }
        return ResponseDTO.succ();
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullDataNew(String code) {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray data = jsonObject.getJSONArray("data");
            List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);
            if (CollectionUtils.isNotEmpty(list)) {
                list.forEach(e -> e.setCode(code));
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                if (CollectionUtils.isNotEmpty(existList)) {
                    //存在map
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //传输map
                    Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //增量
                    List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(increment)) {
                        entityPointService.saveBatch(increment);
                    }
                    //减量
                    List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(decrement)) {
                        entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));
                    }
                    //变量
                    List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(variable)) {
                        variable.forEach(e -> {
                            e.setName(dataMap.get(e.getPointId()));
                        });
                        entityPointService.updateBatchById(variable);
                    }
                } else {
                    entityPointService.saveBatch(list);
                }
            }
        }
        return ResponseDTO.succ();
    }
 
 
}

Database corresponding entity class

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.io.Serializable;
import java.util.Date;
 
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName(value = "t_entity_point")
public class EntityPointEntity implements Serializable {
 
    private static final long serialVersionUID = 2181036545424452651L;
 
    /**
     * 定义点id
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
 
    /**
     * 定义点id
     */
    private String pointId;
 
    /**
     * 名称
     */
    private String name;
 
    /**
     * 绘制数据
     */
    private String value;
 
    /**
     * 编码
     */
    private String code;
 
    /**
     * 创建时间
     */
    private Date createTime;
 
}

HTTP request proxy tool class

import lombok.extern.slf4j.Slf4j;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
 
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
 
/**
 * HTTP请求代理类
 *
 * @author tarzan Liu
 * @description 发送Get Post请求
 */
@Slf4j
public class HttpProxyUtil {
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @return
     */
    public static String sendGet(String api_url) {
        return sendGet(api_url, "", "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @param param
     * @return
     */
    public static String sendGet(String api_url, String param) {
        return sendGet(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空
     * @param charset 字符集
     * @return
     */
    public static String sendGet(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            // 判断有无参数,若是拼接好的url,就不必再拼接了
            if (param != null && !"".equals(param)) {
                api_url = api_url + "?" + param;
            }
            log.info("请求的路径是:" + api_url);
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
            conn.connect();    // 建立实际的联接
 
            // 定义 BufferedReader输入流来读取URL的相应
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送GET请求出现异常! " + e.getMessage());
            return null;
        }
        //  log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @return
     */
    public static String sendPost(String api_url, String param) {
        return sendPost(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @param charset 字符集
     * @return
     */
    public static String sendPost(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            log.info("请求的路径是:" + api_url + ",参数是:" + param);
 
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
 
            // 发送POST请求必须设置如下两行
            conn.setDoOutput(true);
            conn.setDoInput(true);
 
            // 获取URLConnection对象对应的输出流
            try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
                out.print(param); // 发送请求参数
                out.flush();// flush输出流的缓冲
            }
            // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送POST请求出现异常! " + e.getMessage());
            e.printStackTrace();
        }
        log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
    public static CloseableHttpClient createSSLClientDefault() throws Exception {
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();
        SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);
        return HttpClients.custom().setSSLSocketFactory(sslSf).build();
    }
 
    // 加载证书
    private static class AllTrustStrategy implements TrustStrategy {
        public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
            return true;
        }
    }
 
    /**
     * 支持https请求
     *
     * @param url
     * @param param
     * @return
     * @throws Exception
     */
    public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {
        CloseableHttpClient httpClient = createSSLClientDefault();
        HttpPost httpPost = null;
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 发起HTTP的POST请求
            httpPost = new HttpPost(url);
            List<NameValuePair> paramList = new ArrayList<NameValuePair>();
            for (String key : param.keySet()) {
                paramList.add(new BasicNameValuePair(key, param.get(key)));
            }
            log.info("http请求地址:" + url + ",参数:" + paramList.toString());
            // UTF8+URL编码
            httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());
            response = httpClient.execute(httpPost);
            HttpEntity entity = response.getEntity();
            int statusCode = response.getStatusLine().getStatusCode();
            if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200
 
            }
            result = EntityUtils.toString(entity);
            log.info("状态码:" + statusCode + ",响应信息:" + result);
        } finally {
            if (response != null) {
                response.close();
            }
            if (httpPost != null) {
                httpPost.releaseConnection();
            }
            httpClient.close();
        }
        return result;
    }
}

The above is the detailed content of How to use SpringBoot scheduled tasks to achieve data synchronization. For more information, please follow other related articles on the PHP Chinese website!

Statement
This article is reproduced at:亿速云. If there is any infringement, please contact admin@php.cn delete
What aspects of Java development are platform-dependent?What aspects of Java development are platform-dependent?Apr 26, 2025 am 12:19 AM

Javadevelopmentisnotentirelyplatform-independentduetoseveralfactors.1)JVMvariationsaffectperformanceandbehavioracrossdifferentOS.2)NativelibrariesviaJNIintroduceplatform-specificissues.3)Filepathsandsystempropertiesdifferbetweenplatforms.4)GUIapplica

Are there performance differences when running Java code on different platforms? Why?Are there performance differences when running Java code on different platforms? Why?Apr 26, 2025 am 12:15 AM

Java code will have performance differences when running on different platforms. 1) The implementation and optimization strategies of JVM are different, such as OracleJDK and OpenJDK. 2) The characteristics of the operating system, such as memory management and thread scheduling, will also affect performance. 3) Performance can be improved by selecting the appropriate JVM, adjusting JVM parameters and code optimization.

What are some limitations of Java's platform independence?What are some limitations of Java's platform independence?Apr 26, 2025 am 12:10 AM

Java'splatformindependencehaslimitationsincludingperformanceoverhead,versioncompatibilityissues,challengeswithnativelibraryintegration,platform-specificfeatures,andJVMinstallation/maintenance.Thesefactorscomplicatethe"writeonce,runanywhere"

Explain the difference between platform independence and cross-platform development.Explain the difference between platform independence and cross-platform development.Apr 26, 2025 am 12:08 AM

Platformindependenceallowsprogramstorunonanyplatformwithoutmodification,whilecross-platformdevelopmentrequiressomeplatform-specificadjustments.Platformindependence,exemplifiedbyJava,enablesuniversalexecutionbutmaycompromiseperformance.Cross-platformd

How does Just-In-Time (JIT) compilation affect Java's performance and platform independence?How does Just-In-Time (JIT) compilation affect Java's performance and platform independence?Apr 26, 2025 am 12:02 AM

JITcompilationinJavaenhancesperformancewhilemaintainingplatformindependence.1)Itdynamicallytranslatesbytecodeintonativemachinecodeatruntime,optimizingfrequentlyusedcode.2)TheJVMremainsplatform-independent,allowingthesameJavaapplicationtorunondifferen

Why is Java a popular choice for developing cross-platform desktop applications?Why is Java a popular choice for developing cross-platform desktop applications?Apr 25, 2025 am 12:23 AM

Javaispopularforcross-platformdesktopapplicationsduetoits"WriteOnce,RunAnywhere"philosophy.1)ItusesbytecodethatrunsonanyJVM-equippedplatform.2)LibrarieslikeSwingandJavaFXhelpcreatenative-lookingUIs.3)Itsextensivestandardlibrarysupportscompr

Discuss situations where writing platform-specific code in Java might be necessary.Discuss situations where writing platform-specific code in Java might be necessary.Apr 25, 2025 am 12:22 AM

Reasons for writing platform-specific code in Java include access to specific operating system features, interacting with specific hardware, and optimizing performance. 1) Use JNA or JNI to access the Windows registry; 2) Interact with Linux-specific hardware drivers through JNI; 3) Use Metal to optimize gaming performance on macOS through JNI. Nevertheless, writing platform-specific code can affect the portability of the code, increase complexity, and potentially pose performance overhead and security risks.

What are the future trends in Java development that relate to platform independence?What are the future trends in Java development that relate to platform independence?Apr 25, 2025 am 12:12 AM

Java will further enhance platform independence through cloud-native applications, multi-platform deployment and cross-language interoperability. 1) Cloud native applications will use GraalVM and Quarkus to increase startup speed. 2) Java will be extended to embedded devices, mobile devices and quantum computers. 3) Through GraalVM, Java will seamlessly integrate with languages ​​such as Python and JavaScript to enhance cross-language interoperability.

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

mPDF

mPDF

mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

VSCode Windows 64-bit Download

VSCode Windows 64-bit Download

A free and powerful IDE editor launched by Microsoft

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Integrate Eclipse with SAP NetWeaver application server.

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.