Rumah >Java >javaTutorial >Cara menggunakan tugas berjadual SpringBoot untuk mencapai penyegerakan data

Cara menggunakan tugas berjadual SpringBoot untuk mencapai penyegerakan data

PHPz
PHPzke hadapan
2023-05-10 20:40:042009semak imbas

Kata Pengantar

Keperluan perniagaan adalah untuk mendapatkan data peranti dengan memanggil antara muka API di platform tengah, yang memerlukan penyegerakan data peranti sebenar.

Pilihan 1: Laksanakan kaedah pullData() melalui antara muka pengundian untuk mencapai penyegerakan data

Prinsip kaedah ini ialah mengosongkan dahulu semua data terdahulu dan kemudian memasukkan semula ia Data terkini yang diperolehi melalui panggilan api. Kelebihan kaedah ini ialah logiknya mudah. Kelemahannya ialah data kerap dipadam dan dimasukkan. Apabila memanggil semula data pertanyaan, pada masa tertentu, semua data akan dipadamkan, tetapi ia belum dimasukkan dalam masa. Mungkin terdapat anomali dalam data.

Pilihan 2: Laksanakan kaedah pullDataNew() melalui antara muka pengundian untuk mencapai penyegerakan data

Prinsip kaedah ini adalah untuk menanya pangkalan data, data sedia ada, dan kemudian menyegerakkannya dengan Bandingkan data terkini yang diperoleh melalui panggilan API, ketahui kenaikan, pengurangan dan pembolehubah dalam data dan kemas kininya secara serentak. Kelebihan kaedah ini adalah untuk mengurangkan operasi yang kerap pada pangkalan data dan meningkatkan prestasi. Kelemahan: Tiada kekurangan yang jelas ditemui.

package com.hxtx.spacedata.task;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.api.client.util.Lists;
import com.hxtx.spacedata.common.domain.ResponseDTO;
import com.hxtx.spacedata.config.SpringContextUtil;
import com.hxtx.spacedata.controller.file.FilesMinioController;
import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity;
import com.hxtx.spacedata.service.entityconfig.EntityPointService;
import com.hxtx.spacedata.util.HttpProxyUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
 
 
/**
 * 中台设备数据 定时任务执行
 *
 * @author Tarzan Liu
 * @version 1.0.0
 * @description
 * @date 2020/12/07
 */
@Component
@Slf4j
public class EntityPointTask {
 
    @Autowired
    private EntityPointService entityPointService;
 
    @Value("${middleGround.server.host}")
    private String host;
 
    @Value("${middleGround.server.port}")
    private String port;
 
    private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);
 
    /**
     * 设备定义点数据拉取
     *
     * @author tarzan Liu
     * @date 2020/12/2
     */
    @Scheduled(cron = "0/30 * * * * ?") // 30秒校验一次
    public void pullDataTaskByCorn() {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String systemId = obj.getString("id");
                    pullDataNew(systemId);
                }
            }
        }
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullData(String code) {
        List<EntityPointEntity> list = Lists.newArrayList();
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray array = jsonObject.getJSONArray("data");
            if (array != null && array.size() != 0) {
                for (int i = 0; i < array.size(); i++) {
                    JSONObject obj = array.getJSONObject(i);
                    String pointId = obj.getString("pointId");
                    String name = obj.getString("name");
                    list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());
                }
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));
                if (CollectionUtils.isNotEmpty(existList)) {
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));
                    list.forEach(e -> {
                        String value = existMap.get(e.getPointId());
                        if (value != null) {
                            e.setValue(value);
                        }
                    });
                }
                entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                entityPointService.saveBatch(list);
            }
        }
        return ResponseDTO.succ();
    }
 
 
    @Transactional(rollbackFor = Throwable.class)
    public ResponseDTO<String> pullDataNew(String code) {
        String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);
        JSONObject jsonObject = JSON.parseObject(result);
        if (Objects.nonNull(jsonObject)) {
            JSONArray data = jsonObject.getJSONArray("data");
            List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);
            if (CollectionUtils.isNotEmpty(list)) {
                list.forEach(e -> e.setCode(code));
                List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));
                if (CollectionUtils.isNotEmpty(existList)) {
                    //存在map
                    Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //传输map
                    Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));
                    //增量
                    List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(increment)) {
                        entityPointService.saveBatch(increment);
                    }
                    //减量
                    List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(decrement)) {
                        entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));
                    }
                    //变量
                    List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());
                    if (CollectionUtils.isNotEmpty(variable)) {
                        variable.forEach(e -> {
                            e.setName(dataMap.get(e.getPointId()));
                        });
                        entityPointService.updateBatchById(variable);
                    }
                } else {
                    entityPointService.saveBatch(list);
                }
            }
        }
        return ResponseDTO.succ();
    }
 
 
}

Kelas entiti sepadan pangkalan data

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.io.Serializable;
import java.util.Date;
 
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@TableName(value = "t_entity_point")
public class EntityPointEntity implements Serializable {
 
    private static final long serialVersionUID = 2181036545424452651L;
 
    /**
     * 定义点id
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private Long id;
 
    /**
     * 定义点id
     */
    private String pointId;
 
    /**
     * 名称
     */
    private String name;
 
    /**
     * 绘制数据
     */
    private String value;
 
    /**
     * 编码
     */
    private String code;
 
    /**
     * 创建时间
     */
    private Date createTime;
 
}

Kelas alat proksi permintaan HTTP

import lombok.extern.slf4j.Slf4j;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
 
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
 
/**
 * HTTP请求代理类
 *
 * @author tarzan Liu
 * @description 发送Get Post请求
 */
@Slf4j
public class HttpProxyUtil {
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @return
     */
    public static String sendGet(String api_url) {
        return sendGet(api_url, "", "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url
     * @param param
     * @return
     */
    public static String sendGet(String api_url, String param) {
        return sendGet(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行GET请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空
     * @param charset 字符集
     * @return
     */
    public static String sendGet(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            // 判断有无参数,若是拼接好的url,就不必再拼接了
            if (param != null && !"".equals(param)) {
                api_url = api_url + "?" + param;
            }
            log.info("请求的路径是:" + api_url);
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
            conn.connect();    // 建立实际的联接
 
            // 定义 BufferedReader输入流来读取URL的相应
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送GET请求出现异常! " + e.getMessage());
            return null;
        }
        //  log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @return
     */
    public static String sendPost(String api_url, String param) {
        return sendPost(api_url, param, "utf-8");
    }
 
    /**
     * 使用URLConnection进行POST请求
     *
     * @param api_url 请求路径
     * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空
     * @param charset 字符集
     * @return
     */
    public static String sendPost(String api_url, String param, String charset) {
        StringBuffer buffer = new StringBuffer();
        try {
            log.info("请求的路径是:" + api_url + ",参数是:" + param);
 
            URL realUrl = new URL(api_url);
            // 打开联接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");
            conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)
            conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)
 
            // 发送POST请求必须设置如下两行
            conn.setDoOutput(true);
            conn.setDoInput(true);
 
            // 获取URLConnection对象对应的输出流
            try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
                out.print(param); // 发送请求参数
                out.flush();// flush输出流的缓冲
            }
            // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API服务器XML的中文不能被成功识别
            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {
                String line;
                while ((line = in.readLine()) != null) {
//                    buffer.append("\n"+line);
                    buffer.append(line);
                }
            }
        } catch (Exception e) {
            log.error("发送POST请求出现异常! " + e.getMessage());
            e.printStackTrace();
        }
        log.info("响应返回数据:" + buffer.toString());
        return buffer.toString();
    }
 
    public static CloseableHttpClient createSSLClientDefault() throws Exception {
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build();
        SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);
        return HttpClients.custom().setSSLSocketFactory(sslSf).build();
    }
 
    // 加载证书
    private static class AllTrustStrategy implements TrustStrategy {
        public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
            return true;
        }
    }
 
    /**
     * 支持https请求
     *
     * @param url
     * @param param
     * @return
     * @throws Exception
     */
    public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {
        CloseableHttpClient httpClient = createSSLClientDefault();
        HttpPost httpPost = null;
        CloseableHttpResponse response = null;
        String result = "";
        try {
            // 发起HTTP的POST请求
            httpPost = new HttpPost(url);
            List<NameValuePair> paramList = new ArrayList<NameValuePair>();
            for (String key : param.keySet()) {
                paramList.add(new BasicNameValuePair(key, param.get(key)));
            }
            log.info("http请求地址:" + url + ",参数:" + paramList.toString());
            // UTF8+URL编码
            httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());
            response = httpClient.execute(httpPost);
            HttpEntity entity = response.getEntity();
            int statusCode = response.getStatusLine().getStatusCode();
            if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200
 
            }
            result = EntityUtils.toString(entity);
            log.info("状态码:" + statusCode + ",响应信息:" + result);
        } finally {
            if (response != null) {
                response.close();
            }
            if (httpPost != null) {
                httpPost.releaseConnection();
            }
            httpClient.close();
        }
        return result;
    }
}

Atas ialah kandungan terperinci Cara menggunakan tugas berjadual SpringBoot untuk mencapai penyegerakan data. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam