ホームページ  >  記事  >  Java  >  SpringBoot がデータワークを統合する方法

SpringBoot がデータワークを統合する方法

PHPz
PHPz転載
2023-05-14 16:01:061271ブラウズ

    注意事項

    ここでのテストは主に、データワークスから取得したスクリプトを呼び出してローカルに保存することです。
    このスクリプトには 2 つの部分が含まれています

    1. 開発された odps スクリプト (OpenApi を通じて取得) 2. テーブル ステートメント スクリプト (データワークス情報を通じて maxCompute に接続し、作成ステートメントを取得します)

    Alibaba Cloud Dataworks OpenApi ページング クエリの制限。一度に最大 100 クエリ。スクリプトをプルするときに複数のページでクエリを実行する必要があります

    このプロジェクトは MaxCompute の SDK/JDBC 接続を使用し、SpringBoot は MaxCompute SDK/JDBC 接続を操作します

    統合実装

    Main実装 ツールクラスを記述します 必要に応じてSpringBeanとして設定してコンテナに注入します

    依存関係の導入

    <properties>
        <java.version>1.8</java.version>
        <!--maxCompute-sdk-版本号-->
        <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
        <!--maxCompute-jdbc-版本号-->
        <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
        <!--dataworks版本号-->
        <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
        <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--max compute sdk-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-sdk-core</artifactId>
        <version>${max-compute-sdk.version}</version>
    </dependency>
    <!--max compute jdbc-->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-jdbc</artifactId>
        <version>${max-compute-jdbc.version}</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    <!--dataworks需要引入aliyun-sdk和dataworks本身-->
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>${aliyun-java-sdk.version}</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>${dataworks-sdk.version}</version>
    </dependency>
    </dependencies>

    リクエストパラメータクラスの記述

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }

    ツールクラスの書き方

    基本的なクラスの準備、スクリプトプル後のコールバック関数

    なぜコールバック関数が必要なのでしょうか? すべてのスクリプトがプルされるため、各ページングの結果をマージすると、メモリオーバーフローを引き起こすため、コールバック関数が使用されます 各サイクルに処理関数を追加するだけです

    /**
     * @Description
     * @Author itdl
     * @Date 2022/08/09 15:12
     */
    @Data
    public class DataWorksOpenApiConnParam {
        /**
         * 区域 eg. cn-shanghai
         */
        private String region;
    
        /**
         * 访问keyId
         */
        private String aliyunAccessId;
        /**
         * 密钥
         */
        private String aliyunAccessKey;
    
        /**
         * 访问端点  就是API的URL前缀
         */
        private String endPoint;
    
        /**
         * 数据库类型 如odps
         */
        private String datasourceType;
    
        /**
         * 所属项目
         */
        private String project;
    
        /**
         * 项目环境 dev  prod
         */
        private String projectEnv;
    }

    初期化操作

    主に Dataworks openApi インターフェイスのクライアント情報をインスタンス化し、ツールを初期化することですmaxCompute 接続のクラス (JDBC および SDK メソッドを含む)

    private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
    /**默认的odps接口地址 在Odps中也可以看到该变量*/
    private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
    /**
     * dataworks连接参数
     *
     */
    private final DataWorksOpenApiConnParam connParam;
    
    /**
     * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话
     */
    private final MaxComputeJdbcUtil maxComputeJdbcUtil;
    
    private final MaxComputeSdkUtil maxComputeSdkUtil;
    
    private final boolean odpsSdk;
    
    
    /**
     * 客户端
     */
    private final IAcsClient client;
    
    public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
        this.connParam = connParam;
        this.client = buildClient();
        this.odpsSdk = odpsSdk;
        if (odpsSdk){
            this.maxComputeJdbcUtil = null;
            this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
        }else {
            this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
            this.maxComputeSdkUtil = null;
        }
    }
    
    private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
        final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setMaxComputeEndpoint(defaultEndpoint);
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeSdkUtil(param);
    }
    
    private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
        final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
    
        // 设置账号密码
        param.setAliyunAccessId(connParam.getAliyunAccessId());
        param.setAliyunAccessKey(connParam.getAliyunAccessKey());
    
        // 设置endpoint
        param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
    
        // 目前只处理odps的引擎
        final String datasourceType = connParam.getDatasourceType();
        if (!"odps".equals(datasourceType)){
            throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
        }
    
        // 获取项目环境,根据项目环境连接不同的maxCompute
        final String projectEnv = connParam.getProjectEnv();
    
        if ("dev".equals(projectEnv)){
            // 开发环境dataworks + _dev就是maxCompute的项目名
            param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
        }else {
            // 生产环境dataworks的项目名和maxCompute一致
            param.setProjectName(connParam.getProject());
        }
    
        return new MaxComputeJdbcUtil(param);
    }

    OpenApi を呼び出してすべてのスクリプトをプルする

    /**
     * 根据文件夹路径分页查询该路径下的文件(脚本)
     * @param pageSize 每页查询多少数据
     * @param folderPath 文件所在目录
     * @param userType 文件所属功能模块 可不传
     * @param fileTypes 设置文件代码类型 逗号分割 可不传
     */
    public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
        pageSize = setPageSize(pageSize);
        // 创建请求
        final ListFilesRequest request = new ListFilesRequest();
    
        // 设置分页参数
        request.setPageNumber(1);
        request.setPageSize(pageSize);
    
        // 设置上级文件夹
        request.setFileFolderPath(folderPath);
    
        // 设置区域和项目名称
        request.setSysRegionId(connParam.getRegion());
        request.setProjectIdentifier(connParam.getProject());
    
        // 设置文件所属功能模块
        if (!ObjectUtils.isEmpty(userType)){
            request.setUseType(userType);
        }
        // 设置文件代码类型
        if (!ObjectUtils.isEmpty(fileTypes)){
            request.setFileTypes(fileTypes);
        }
    
        // 发起请求
        ListFilesResponse res = client.getAcsResponse(request);
    
        // 获取分页总数
        final Integer totalCount = res.getData().getTotalCount();
        // 返回结果
        final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
        // 计算能分几页
        long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
        // 只有1页 直接返回
        if (pages <= 1){
            callBack.handle(resultList);
            return;
        }
    
        // 第一页执行回调
        callBack.handle(resultList);
    
        // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取
        for (int i = 2; i <= pages; i++) {
            //第1页
            request.setPageNumber(i);
            //每页大小
            request.setPageSize(pageSize);
            // 发起请求
            res = client.getAcsResponse(request);
            final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
            if (!ObjectUtils.isEmpty(tableEntityList)){
                // 执行回调函数
                callBack.handle(tableEntityList);
            }
        }
    }

    MaxCompute への内部接続ですべての DDL スクリプト コンテンツをプルする

    DataWorks ツール クラスコード、コールバック関数を介して処理されます

        /**
         * 获取所有的DDL脚本
         * @param callBack 回调处理函数
         */
        public void listAllDdl(CallBack.DdlCallBack callBack){
            if (odpsSdk){
                final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
                for (TableMetaInfo tableInfo : tableInfos) {
                    final String tableName = tableInfo.getTableName();
                    final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
                    callBack.handle(tableName, sqlCreateDesc);
                }
            }
        }

    MaxCompute ツール クラス コード、テーブル名に基づいてテーブル作成ステートメントを取得します。例として SDK を使用します。JDBC は show create table を直接実行してテーブル作成ステートメント

    /**
     * 根据表名获取建表语句
     * @param tableName 表名
     * @return
     */
    public String getSqlCreateDesc(String tableName) {
        final Table table = odps.tables().get(tableName);
        // 建表语句
        StringBuilder mssqlDDL = new StringBuilder();
    
        // 获取表结构
        TableSchema tableSchema = table.getSchema();
        // 获取表名表注释
        String tableComment = table.getComment();
    
        //获取列名列注释
        List<Column> columns = tableSchema.getColumns();
        /*组装成mssql的DDL*/
        // 表名
        mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
        mssqlDDL.append(tableName).append("\n");
        mssqlDDL.append(" (\n");
        //列字段
        int index = 1;
        for (Column column : columns) {
            mssqlDDL.append("  ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
            if (!ObjectUtils.isEmpty(column.getComment())) {
                mssqlDDL.append(" COMMENT &#39;").append(column.getComment()).append("&#39;");
            }
            if (index == columns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            index++;
        }
        mssqlDDL.append(" )\n");
        //获取分区
        List<Column> partitionColumns = tableSchema.getPartitionColumns();
        int partitionIndex = 1;
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append("PARTITIONED BY (");
        }
        for (Column partitionColumn : partitionColumns) {
            final String format = String.format("%s %s COMMENT &#39;%s&#39;", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
            mssqlDDL.append(format);
            if (partitionIndex == partitionColumns.size()) {
                mssqlDDL.append("\n");
            } else {
                mssqlDDL.append(",\n");
            }
            partitionIndex++;
        }
    
        if (!ObjectUtils.isEmpty(partitionColumns)) {
            mssqlDDL.append(")\n");
        }
    //        mssqlDDL.append("STORED AS ALIORC  \n");
    //        mssqlDDL.append("TBLPROPERTIES (&#39;comment&#39;=&#39;").append(tableComment).append("&#39;);");
        mssqlDDL.append(";");
        return mssqlDDL.toString();
    }

    テスト コード

    public static void main(String[] args) throws ClientException {
        final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
        connParam.setAliyunAccessId("您的阿里云账号accessId");
        connParam.setAliyunAccessKey("您的阿里云账号accessKey");
        // dataworks所在区域
        connParam.setRegion("cn-chengdu");
        // dataworks所属项目
        connParam.setProject("dataworks所属项目");
        // dataworks所属项目环境 如果不分环境的话设置为生产即可
        connParam.setProjectEnv("dev");
        // 数据引擎类型 odps
        connParam.setDatasourceType("odps");
        // ddataworks接口地址
        connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
        final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);
    
        // 拉取所有ODPS脚本
        dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
            // 处理文件
            for (ListFilesResponse.Data.File file : files) {
                final String fileName = file.getFileName();
                System.out.println(fileName);
            }
        });
    
        // 拉取所有表的建表语句
        dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
            System.out.println("=======================================");
            System.out.println("表名:" + tableName + "内容如下:\n");
            System.out.println(tableDdlContent);
            System.out.println("=======================================");
        });
    }

    テスト結果

    test_001 スクリプト
    test_002 スクリプト
    test_003 スクリプト
    test_004 スクリプト
    test_005 スクリプト
    ==================== ====================
    テーブル名: test_abc_info 内容は次のとおりです以下:

    存在しない場合はテーブルを作成します test_abc_info
    (
    test_abc1 文字列コメント 'フィールド 1',
    test_abc2 文字列コメント 'フィールド 2',
    test_abc3 文字列コメント 'フィールド 3' ,
    test_abc4 文字列コメント 'フィールド 4',
    test_abc5 文字列コメント 'フィールド 5' ,
    test_abc6 文字列コメント 'フィールド 6',
    test_abc7 文字列コメント 'フィールド 7',
    test_abc8 文字列COMMENT 'フィールド 8'
    )
    PARTITIONED BY (p_date STRING COMMENT 'データ日付'
    )
    ;
    ================= =========================
    ターゲット VM から切断されました、アドレス: '127.0.0.1:59509'、トランスポート: 'ソケット'

    以上がSpringBoot がデータワークを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。