ホームページ >Java >&#&チュートリアル >SpringBoot がデータワークを統合する方法
ここでのテストは主に、データワークスから取得したスクリプトを呼び出してローカルに保存することです。
このスクリプトには 2 つの部分が含まれています
1. 開発された odps スクリプト (OpenApi を通じて取得) 2. テーブル ステートメント スクリプト (データワークス情報を通じて maxCompute に接続し、作成ステートメントを取得します)
Alibaba Cloud Dataworks OpenApi ページング クエリの制限。一度に最大 100 クエリ。スクリプトをプルするときに複数のページでクエリを実行する必要があります
このプロジェクトは MaxCompute の SDK/JDBC 接続を使用し、SpringBoot は MaxCompute SDK/JDBC 接続を操作します
Main実装 ツールクラスを記述します 必要に応じてSpringBeanとして設定してコンテナに注入します
<properties> <java.version>1.8</java.version> <!--maxCompute-sdk-版本号--> <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version> <!--maxCompute-jdbc-版本号--> <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version> <!--dataworks版本号--> <dataworks-sdk.version>3.4.2</dataworks-sdk.version> <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--max compute sdk--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>${max-compute-sdk.version}</version> </dependency> <!--max compute jdbc--> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>${max-compute-jdbc.version}</version> <classifier>jar-with-dependencies</classifier> </dependency> <!--dataworks需要引入aliyun-sdk和dataworks本身--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>${aliyun-java-sdk.version}</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>${dataworks-sdk.version}</version> </dependency> </dependencies>
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 区域 eg. cn-shanghai */ private String region; /** * 访问keyId */ private String aliyunAccessId; /** * 密钥 */ private String aliyunAccessKey; /** * 访问端点 就是API的URL前缀 */ private String endPoint; /** * 数据库类型 如odps */ private String datasourceType; /** * 所属项目 */ private String project; /** * 项目环境 dev prod */ private String projectEnv; }
基本的なクラスの準備、スクリプトプル後のコールバック関数
なぜコールバック関数が必要なのでしょうか? すべてのスクリプトがプルされるため、各ページングの結果をマージすると、メモリオーバーフローを引き起こすため、コールバック関数が使用されます 各サイクルに処理関数を追加するだけです
/** * @Description * @Author itdl * @Date 2022/08/09 15:12 */ @Data public class DataWorksOpenApiConnParam { /** * 区域 eg. cn-shanghai */ private String region; /** * 访问keyId */ private String aliyunAccessId; /** * 密钥 */ private String aliyunAccessKey; /** * 访问端点 就是API的URL前缀 */ private String endPoint; /** * 数据库类型 如odps */ private String datasourceType; /** * 所属项目 */ private String project; /** * 项目环境 dev prod */ private String projectEnv; }
主に Dataworks openApi インターフェイスのクライアント情報をインスタンス化し、ツールを初期化することですmaxCompute 接続のクラス (JDBC および SDK メソッドを含む)
private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api"; /**默认的odps接口地址 在Odps中也可以看到该变量*/ private static final String defaultEndpoint = "http://service.odps.aliyun.com/api"; /** * dataworks连接参数 * */ private final DataWorksOpenApiConnParam connParam; /** * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话 */ private final MaxComputeJdbcUtil maxComputeJdbcUtil; private final MaxComputeSdkUtil maxComputeSdkUtil; private final boolean odpsSdk; /** * 客户端 */ private final IAcsClient client; public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) { this.connParam = connParam; this.client = buildClient(); this.odpsSdk = odpsSdk; if (odpsSdk){ this.maxComputeJdbcUtil = null; this.maxComputeSdkUtil = buildMaxComputeSdkUtil(); }else { this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil(); this.maxComputeSdkUtil = null; } } private MaxComputeSdkUtil buildMaxComputeSdkUtil() { final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam(); // 设置账号密码 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 设置endpoint param.setMaxComputeEndpoint(defaultEndpoint); // 目前只处理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 获取项目环境,根据项目环境连接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 开发环境dataworks + _dev就是maxCompute的项目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生产环境dataworks的项目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeSdkUtil(param); } private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() { final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam(); // 设置账号密码 param.setAliyunAccessId(connParam.getAliyunAccessId()); param.setAliyunAccessKey(connParam.getAliyunAccessKey()); // 设置endpoint param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion())); // 目前只处理odps的引擎 final String datasourceType = connParam.getDatasourceType(); if (!"odps".equals(datasourceType)){ throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR); } // 获取项目环境,根据项目环境连接不同的maxCompute final String projectEnv = connParam.getProjectEnv(); if ("dev".equals(projectEnv)){ // 开发环境dataworks + _dev就是maxCompute的项目名 param.setProjectName(String.join("_", connParam.getProject(), projectEnv)); }else { // 生产环境dataworks的项目名和maxCompute一致 param.setProjectName(connParam.getProject()); } return new MaxComputeJdbcUtil(param); }
OpenApi を呼び出してすべてのスクリプトをプルする
/** * 根据文件夹路径分页查询该路径下的文件(脚本) * @param pageSize 每页查询多少数据 * @param folderPath 文件所在目录 * @param userType 文件所属功能模块 可不传 * @param fileTypes 设置文件代码类型 逗号分割 可不传 */ public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException { pageSize = setPageSize(pageSize); // 创建请求 final ListFilesRequest request = new ListFilesRequest(); // 设置分页参数 request.setPageNumber(1); request.setPageSize(pageSize); // 设置上级文件夹 request.setFileFolderPath(folderPath); // 设置区域和项目名称 request.setSysRegionId(connParam.getRegion()); request.setProjectIdentifier(connParam.getProject()); // 设置文件所属功能模块 if (!ObjectUtils.isEmpty(userType)){ request.setUseType(userType); } // 设置文件代码类型 if (!ObjectUtils.isEmpty(fileTypes)){ request.setFileTypes(fileTypes); } // 发起请求 ListFilesResponse res = client.getAcsResponse(request); // 获取分页总数 final Integer totalCount = res.getData().getTotalCount(); // 返回结果 final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles(); // 计算能分几页 long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1; // 只有1页 直接返回 if (pages <= 1){ callBack.handle(resultList); return; } // 第一页执行回调 callBack.handle(resultList); // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取 for (int i = 2; i <= pages; i++) { //第1页 request.setPageNumber(i); //每页大小 request.setPageSize(pageSize); // 发起请求 res = client.getAcsResponse(request); final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles(); if (!ObjectUtils.isEmpty(tableEntityList)){ // 执行回调函数 callBack.handle(tableEntityList); } } }
MaxCompute への内部接続ですべての DDL スクリプト コンテンツをプルする
DataWorks ツール クラスコード、コールバック関数を介して処理されます
/** * 获取所有的DDL脚本 * @param callBack 回调处理函数 */ public void listAllDdl(CallBack.DdlCallBack callBack){ if (odpsSdk){ final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos(); for (TableMetaInfo tableInfo : tableInfos) { final String tableName = tableInfo.getTableName(); final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName); callBack.handle(tableName, sqlCreateDesc); } } }
MaxCompute ツール クラス コード、テーブル名に基づいてテーブル作成ステートメントを取得します。例として SDK を使用します。JDBC は show create table を直接実行してテーブル作成ステートメント
/** * 根据表名获取建表语句 * @param tableName 表名 * @return */ public String getSqlCreateDesc(String tableName) { final Table table = odps.tables().get(tableName); // 建表语句 StringBuilder mssqlDDL = new StringBuilder(); // 获取表结构 TableSchema tableSchema = table.getSchema(); // 获取表名表注释 String tableComment = table.getComment(); //获取列名列注释 List<Column> columns = tableSchema.getColumns(); /*组装成mssql的DDL*/ // 表名 mssqlDDL.append("CREATE TABLE IF NOT EXISTS "); mssqlDDL.append(tableName).append("\n"); mssqlDDL.append(" (\n"); //列字段 int index = 1; for (Column column : columns) { mssqlDDL.append(" ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName()); if (!ObjectUtils.isEmpty(column.getComment())) { mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'"); } if (index == columns.size()) { mssqlDDL.append("\n"); } else { mssqlDDL.append(",\n"); } index++; } mssqlDDL.append(" )\n"); //获取分区 List<Column> partitionColumns = tableSchema.getPartitionColumns(); int partitionIndex = 1; if (!ObjectUtils.isEmpty(partitionColumns)) { mssqlDDL.append("PARTITIONED BY ("); } for (Column partitionColumn : partitionColumns) { final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment()); mssqlDDL.append(format); if (partitionIndex == partitionColumns.size()) { mssqlDDL.append("\n"); } else { mssqlDDL.append(",\n"); } partitionIndex++; } if (!ObjectUtils.isEmpty(partitionColumns)) { mssqlDDL.append(")\n"); } // mssqlDDL.append("STORED AS ALIORC \n"); // mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');"); mssqlDDL.append(";"); return mssqlDDL.toString(); }
public static void main(String[] args) throws ClientException { final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam(); connParam.setAliyunAccessId("您的阿里云账号accessId"); connParam.setAliyunAccessKey("您的阿里云账号accessKey"); // dataworks所在区域 connParam.setRegion("cn-chengdu"); // dataworks所属项目 connParam.setProject("dataworks所属项目"); // dataworks所属项目环境 如果不分环境的话设置为生产即可 connParam.setProjectEnv("dev"); // 数据引擎类型 odps connParam.setDatasourceType("odps"); // ddataworks接口地址 connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com"); final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true); // 拉取所有ODPS脚本 dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> { // 处理文件 for (ListFilesResponse.Data.File file : files) { final String fileName = file.getFileName(); System.out.println(fileName); } }); // 拉取所有表的建表语句 dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> { System.out.println("======================================="); System.out.println("表名:" + tableName + "内容如下:\n"); System.out.println(tableDdlContent); System.out.println("======================================="); }); }
test_001 スクリプト
test_002 スクリプト
test_003 スクリプト
test_004 スクリプト
test_005 スクリプト
==================== ====================
テーブル名: test_abc_info 内容は次のとおりです以下:存在しない場合はテーブルを作成します test_abc_info
(
test_abc1 文字列コメント 'フィールド 1',
test_abc2 文字列コメント 'フィールド 2',
test_abc3 文字列コメント 'フィールド 3' ,
test_abc4 文字列コメント 'フィールド 4',
test_abc5 文字列コメント 'フィールド 5' ,
test_abc6 文字列コメント 'フィールド 6',
test_abc7 文字列コメント 'フィールド 7',
test_abc8 文字列COMMENT 'フィールド 8'
)
PARTITIONED BY (p_date STRING COMMENT 'データ日付'
)
;
================= =========================
ターゲット VM から切断されました、アドレス: '127.0.0.1:59509'、トランスポート: 'ソケット'
以上がSpringBoot がデータワークを統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。