Heim  >  Artikel  >  Java  >  Wie SpringBoot Dataworks integriert

Wie SpringBoot Dataworks integriert

PHPz
PHPznach vorne
2023-05-14 16:01:061227Durchsuche

    Hinweise

    Der Test hier besteht hauptsächlich darin, das aus Dataworks abgerufene Skript aufzurufen und lokal zu speichern. Das Skript besteht aus zwei Teilen Die Abfrage umfasst jeweils 100 Zeitstreifen. Wenn wir das Skript abrufen, müssen wir es auf mehreren Seiten abfragen

    Dieses Projekt verwendet die SDK/JDBC-Verbindung von MaxCompute und SpringBoot betreibt die MaxCompute SDK/JDBC-Verbindung

    Integrationsimplementierung

    Die Implementierung umfasst hauptsächlich das Schreiben von Toolklassen, die dies können Bei Bedarf als SpringBean konfiguriert werden, kann es nach dem Einfügen in den Container verwendet werden. Einführung in die Abhängigkeit ist die erforderliche Rückruffunktion, da alle Skripte abgerufen werden. Wenn die Ergebnisse jedes Pagings zusammengeführt werden, führt dies zu einem Speicherüberlauf, und die Verwendung der Rückruffunktion fügt nur eine Verarbeitungsfunktion für jeden Zyklus hinzu

    <properties>
        <java.version>1.8</java.version>
        <!--maxCompute-sdk-版本号-->
        <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
        <!--maxCompute-jdbc-版本号-->
        <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
        <!--dataworks版本号-->
        <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
        <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--max compute sdk-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-sdk-core</artifactId>
        <version>${max-compute-sdk.version}</version>
    </dependency>
    <!--max compute jdbc-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-jdbc</artifactId>
        <version>${max-compute-jdbc.version}</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    <!--dataworks需要引入aliyun-sdk和dataworks本身-->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>${aliyun-java-sdk.version}</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>${dataworks-sdk.version}</version>
    </dependency>
    </dependencies>

    Initialisierungsvorgang

    Instanziiert hauptsächlich die Client-Informationen der Dataworks-OpenApi-Schnittstelle und initialisiert die Tool-Klasse für die MaxCompute-Verbindung (einschließlich JDBC- und SDK-Methoden).

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }

    Rufen Sie OpenApi auf, um alle Skripte abzurufen Code, verarbeitet durch Rückruffunktion

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }

    MaxCompute-Tool-Klassencode, entsprechend dem Tabellennamen Holen Sie sich die Tabellenerstellungsanweisung. Am Beispiel des SDK führt JDBC direkt show create table aus, um die Tabellenerstellungsanweisung zu erhalten

    private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
    /**默认的odps接口地址 在Odps中也可以看到该变量*/
    private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
    /**
     * dataworks连接参数
     *
     */
    private final DataWorksOpenApiConnParam connParam;
    
    /**
     * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话
     */
    private final MaxComputeJdbcUtil maxComputeJdbcUtil;
    
    private final MaxComputeSdkUtil maxComputeSdkUtil;
    
    private final boolean odpsSdk;
    
    
    /**
     * 客户端
     */
    private final IAcsClient client;
    
    public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
        this.connParam = connParam;
        this.client = buildClient();
        this.odpsSdk = odpsSdk;
        if (odpsSdk){
            this.maxComputeJdbcUtil = null;
            this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
        }else {
            this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
            this.maxComputeSdkUtil = null;
        }
    }
    
    private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
        final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setMaxComputeEndpoint(defaultEndpoint);
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeSdkUtil(param);
    }
    
    private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
        final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeJdbcUtil(param);
    }

    Testcode

    /**
     * 根据文件夹路径分页查询该路径下的文件(脚本)
     * @param pageSize 每页查询多少数据
     * @param folderPath 文件所在目录
     * @param userType 文件所属功能模块 可不传
     * @param fileTypes 设置文件代码类型 逗号分割 可不传
     */
    public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
        pageSize = setPageSize(pageSize);
        // 创建请求
        final ListFilesRequest request = new ListFilesRequest();
    
        // 设置分页参数
        request.setPageNumber(1);
        request.setPageSize(pageSize);
    
        // 设置上级文件夹
        request.setFileFolderPath(folderPath);
    
        // 设置区域和项目名称
        request.setSysRegionId(connParam.getRegion());
        request.setProjectIdentifier(connParam.getProject());
    
        // 设置文件所属功能模块
        if (!ObjectUtils.isEmpty(userType)){
            request.setUseType(userType);
        }
        // 设置文件代码类型
        if (!ObjectUtils.isEmpty(fileTypes)){
            request.setFileTypes(fileTypes);
        }
    
        // 发起请求
        ListFilesResponse res = client.getAcsResponse(request);
    
        // 获取分页总数
        final Integer totalCount = res.getData().getTotalCount();
        // 返回结果
        final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
        // 计算能分几页
        long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
        // 只有1页 直接返回
        if (pages <= 1){
            callBack.handle(resultList);
            return;
        }
    
        // 第一页执行回调
        callBack.handle(resultList);
    
        // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取
        for (int i = 2; i <= pages; i++) {
            //第1页
            request.setPageNumber(i);
            //每页大小
            request.setPageSize(pageSize);
            // 发起请求
            res = client.getAcsResponse(request);
            final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
            if (!ObjectUtils.isEmpty(tableEntityList)){
                // 执行回调函数
                callBack.handle(tableEntityList);
            }
        }
    }

    Testergebnisse

    test_001-Skript

    test_002-Skript

    test_003-Skript

    test_004-Skript

    test_005-Skript

    ============================ ===========

    Tabellenname: test_abc_info Der Inhalt ist wie folgt:

    TABELLE ERSTELLEN, WENN NICHT EXISTIERT test_abc_info

    (

    test_abc1 STRING COMMENT 'Feld 1',

    test_abc2 STRING COMMENT 'Feld 2',

    test_abc3 STRING KOMMENTAR 'Feld 3',

    test_abc 4 STRING KOMMENTAR 'Feld 4',

    test_abc5 STRING KOMMENTAR 'Feld 5',

    test_abc6 STRING KOMMENTAR 'Feld 6',

    test_abc7 STRING KOMMENTAR 'Feld 7',

    test_abc8 STRING KOMMENTAR 'Feld 8'
    )

    PARTITIONIERT DURCH (p_date STRING COMMENT 'Datendatum'
    )
    ;
    === ======================== ===========
    Von der Ziel-VM getrennt, Adresse: '127.0.0.1: 59509', Transport: 'socket'

    Das obige ist der detaillierte Inhalt vonWie SpringBoot Dataworks integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Stellungnahme:
    Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen