Apache Avro 데이터를 구문 분석하는 방법은 무엇입니까? 이 기사에서는 Avro 데이터를 생성하기 위해 직렬화하고, Avro 데이터를 구문 분석하기 위해 역직렬화하고, FlinkSQL을 사용하여 Avro 데이터를 구문 분석하는 방법을 소개합니다. 도움이 되길 바랍니다.
인터넷의 급속한 발전으로 클라우드 컴퓨팅, 빅데이터, 인공지능, 사물인터넷 등 첨단기술이 전자상거래 웹사이트 등 오늘날 시대의 주류 첨단기술로 자리 잡았습니다. 집, 스마트시티 등은 사람들의 음식, 의복, 주거, 교통을 편리하게 할 뿐만 아니라, 그 이면에는 사람이 수집하고, 정리하고, 분석하는 수많은 데이터가 존재합니다. 낮은 대기 시간과 높은 데이터 품질을 보장하는 다양한 시스템 플랫폼이 특히 중요합니다. Apache Avro 자체는 바이너리 전송을 위해 스키마를 통해 직렬화됩니다. 반면에 데이터 보안을 보장합니다. Avro는 현재 다양한 산업에서 점점 더 많이 사용되고 있으며, Avro 데이터를 처리하고 구문 분석하는 것이 더 중요합니다. 분석을 위해 FlinkSQL을 사용하세요.
이 기사는 avro 구문 분석에 대한 데모입니다. 현재 FlinkSQL은 간단한 avro 데이터 구문 분석에만 적합합니다. 복잡한 중첩 avro 데이터는 당분간 지원되지 않습니다.
장면 소개
이 기사에서는 주로 다음 세 가지 핵심 내용을 소개합니다.
pom 파일의 내용은 다음과 같습니다.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei.bigdata</groupId>
<artifactId>avrodemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
참고: 위 pom 파일은 자동으로 생성된 클래스의 경로, 즉
로 구성됩니다. p
r
o
j
e
c t {project.basedir }/src/main/ java/.이 구성 후 mvn 명령을 실행할 때 이 플러그인은 이 디렉터리의 avsc 스키마에서 클래스 파일을 자동으로 생성하고 후자의 디렉터리에 배치합니다. avro 디렉터리가 생성되지 않은 경우 수동으로 생성하세요. 2. 스키마 정의JSON을 사용하여 Avro용 스키마를 정의합니다. 스키마는 기본 유형(null, boolean, int, long, float, double, bytes 및 string)과 복합 유형(record, enum, array, map, Union 및fixed)으로 구성됩니다. 예를 들어 다음은 사용자의 스키마를 정의하고 기본 디렉터리에 avro 디렉터리를 만든 다음 avro 디렉터리에 새 파일 user.avsc를 만듭니다. {"namespace": "lancoo.ecbdc.pre",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
3 스키마를 컴파일합니다.点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码
4、序列化
创建TestUser类,用于序列化生成数据
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null
// Alternate constructor
User user2 = new User("Ben", 7, "red");
// Construct via builder
User user3 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.build();
// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
执行序列化程序后,会在项目的同级目录下生成avro数据
user_generic.avro内容如下:
Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
至此avro数据已经生成。
5、反序列化
通过反序列化代码解析avro数据
// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
// Reuse user object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
user = dataFileReader.next(user);
System.out.println(user);
}
执行反序列化代码解析user_generic.avro
avro数据解析成功。
6、将user_generic.avro上传至hdfs路径
hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/
7、配置flinkserver
将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行
cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar
8、编写FlinkSQL
CREATE TABLE testHdfs(
name String,
favorite_number int,
favorite_color String
) WITH(
'connector' = 'filesystem',
'path' = 'hdfs:///tmp/lztest/user_generic.avro',
'format' = 'avro'
);CREATE TABLE KafkaTable (
name String,
favorite_number int,
favorite_color String
) WITH (
'connector' = 'kafka',
'topic' = 'testavro',
'properties.bootstrap.servers' = '96.10.2.1:21005',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'avro'
);
insert into
KafkaTable
select
*
from
testHdfs;
保存提交任务
9、查看对应topic中是否有数据
FlinkSQL解析avro数据成功。
【推荐:Apache使用教程】
위 내용은 Apache Avro 데이터를 구문 분석하는 방법에 대해 이야기해 보겠습니다(예제 설명).의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!