搜索
首页运维Apache聊聊怎么解析Apache Avro数据(示例讲解)

怎么解析Apache Avro数据?本篇文章给大家介绍一下序列化生成Avro数据、反序列化解析Avro数据、使用FlinkSQL解析Avro数据的方法,希望对大家有所帮助!

聊聊怎么解析Apache Avro数据(示例讲解)

随着互联网高速的发展,云计算、大数据、人工智能AI、物联网等前沿技术已然成为当今时代主流的高新技术,诸如电商网站、人脸识别、无人驾驶、智能家居、智慧城市等等,不仅方面方便了人们的衣食住行,背后更是时时刻刻有大量的数据在经过各种各样的系统平台的采集、清晰、分析,而保证数据的低时延、高吞吐、安全性就显得尤为重要,Apache Avro本身通过Schema的方式序列化后进行二进制传输,一方面保证了数据的高速传输,另一方面保证了数据安全性,avro当前在各个行业的应用越来越广泛,如何对avro数据进行处理解析应用就格外重要,本文将演示如果序列化生成avro数据,并使用FlinkSQL进行解析。

本文是avro解析的demo,当前FlinkSQL仅适用于简单的avro数据解析,复杂嵌套avro数据暂时不支持。

场景介绍

本文主要介绍以下三个重点内容:

  • 如何序列化生成Avro数据

  • 如何反序列化解析Avro数据

  • 如何使用FlinkSQL解析Avro数据

前提条件

  • 了解avro是什么,可参考apache avro官网快速入门指南

  • 了解avro应用场景

操作步骤

1、新建avro maven工程项目,配置pom依赖

1.png

pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huawei.bigdata</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

注意:以上pom文件配置了自动生成类的路径,即project.basedir/src/main/avro/{project.basedir}/src/main/avro/和{project.basedir}/src/main/java/,这样配置之后,在执行mvn命令的时候,这个插件就会自动将此目录下的avsc schema生成类文件,并放到后者这个目录下。如果没有生成avro目录,手动创建一下即可。

2、定义schema

使用JSON为Avro定义schema。schema由基本类型(null,boolean, int, long, float, double, bytes 和string)和复杂类型(record, enum, array, map, union, 和fixed)组成。例如,以下定义一个user的schema,在main目录下创建一个avro目录,然后在avro目录下新建文件 user.avsc :

{"namespace": "lancoo.ecbdc.pre",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

2.png

3、编译schema

点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码

3.png

4、序列化

创建TestUser类,用于序列化生成数据

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null

// Alternate constructor
User user2 = new User("Ben", 7, "red");

// Construct via builder
User user3 = User.newBuilder()
        .setName("Charlie")
        .setFavoriteColor("blue")
        .setFavoriteNumber(null)
        .build();

// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

执行序列化程序后,会在项目的同级目录下生成avro数据

4.png

user_generic.avro内容如下:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

至此avro数据已经生成。

5、反序列化

通过反序列化代码解析avro数据

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
}

执行反序列化代码解析user_generic.avro

5.png

avro数据解析成功。

6、将user_generic.avro上传至hdfs路径

hdfs dfs -mkdir -p /tmp/lztest/

hdfs dfs -put user_generic.avro /tmp/lztest/

6.png

7、配置flinkserver

  • 准备avro jar包

将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib

chmod 500 flink-sql-avro*.jar

chown omm:wheel flink-sql-avro*.jar

7.png

  • 同时重启FlinkServer实例,重启完成后查看avro包是否被上传

    hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8.png

8、编写FlinkSQL

CREATE TABLE testHdfs(
  name String,
  favorite_number int,
  favorite_color String
) WITH(
  &#39;connector&#39; = &#39;filesystem&#39;,
  &#39;path&#39; = &#39;hdfs:///tmp/lztest/user_generic.avro&#39;,
  &#39;format&#39; = &#39;avro&#39;
);CREATE TABLE KafkaTable (
  name String,
  favorite_number int,
  favorite_color String
) WITH (
  &#39;connector&#39; = &#39;kafka&#39;,
  &#39;topic&#39; = &#39;testavro&#39;,
  &#39;properties.bootstrap.servers&#39; = &#39;96.10.2.1:21005&#39;,
  &#39;properties.group.id&#39; = &#39;testGroup&#39;,
  &#39;scan.startup.mode&#39; = &#39;latest-offset&#39;,
  &#39;format&#39; = &#39;avro&#39;
);
insert into
  KafkaTable
select
  *
from
  testHdfs;

9.png

保存提交任务

9、查看对应topic中是否有数据

10.png

FlinkSQL解析avro数据成功。

【推荐:Apache使用教程

以上是聊聊怎么解析Apache Avro数据(示例讲解)的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:掘金社区。如有侵权,请联系admin@php.cn删除
什么定义了Apache?它的核心功能什么定义了Apache?它的核心功能May 09, 2025 am 12:21 AM

Apache的核心功能是模块化设计和高度的可定制性,使其能满足各种Web服务需求。1.模块化设计允许通过加载不同模块扩展功能。2.支持多种操作系统,适用于不同环境。3.多进程、多线程和事件驱动模型提高了性能。4.基本用法包括配置虚拟主机和文档根目录。5.高级用法涉及URL重写、负载均衡和反向代理。6.常见错误可以通过语法检查和日志分析调试。7.性能优化包括调整MPM设置和启用缓存。

Apache继续使用:Web托管及其他Apache继续使用:Web托管及其他May 08, 2025 am 12:15 AM

Apache在现代Web环境中仍然受欢迎的原因是其强大功能和灵活性。1)模块化设计允许定制功能,如安全认证和负载均衡。2)支持多操作系统,增强普及性。3)高效处理并发请求,适合各种应用场景。

Apache:从开源到行业标准Apache:从开源到行业标准May 07, 2025 am 12:05 AM

Apache从开源项目发展为行业标准的原因包括:1)社区驱动,吸引全球开发者参与;2)标准化与兼容性,遵循互联网标准;3)商业支持与生态系统,获得企业级市场支持。

Apache的遗产:对网络托管的影响Apache的遗产:对网络托管的影响May 06, 2025 am 12:03 AM

Apache对Webhosting的影响主要体现在其开源特性、强大功能和灵活性。1)开源特性降低了Webhosting的门槛。2)强大功能和灵活性使其成为大型网站和企业的首选。3)虚拟主机功能节省了成本。尽管在高并发情况下性能可能下降,但通过不断优化,Apache仍保持竞争力。

Apache:历史和网络的贡献Apache:历史和网络的贡献May 05, 2025 am 12:14 AM

Apache起源于1995年,由一群开发者创建,旨在改进NCSAHTTPd服务器,成为全球最广泛使用的Web服务器。1.起源于1995年,旨在改进NCSAHTTPd服务器。2.定义了Web服务器标准,推动了开源运动的发展。3.孕育了Tomcat、Kafka等重要子项目。4.面临云计算和容器技术的挑战,未来将注重与云原生技术整合。

Apache的影响:塑造互联网Apache的影响:塑造互联网May 04, 2025 am 12:05 AM

Apache通过提供稳定的Web服务器基础设施、推动开源文化和孵化重要项目,塑造了互联网。1)Apache提供了稳定的Web服务器基础设施,促进了Web技术的创新。2)Apache推动了开源文化的发展,ASF孵化了Hadoop、Kafka等重要项目。3)尽管面临性能挑战,Apache的未来依然充满希望,ASF不断推出新技术。

Apache的遗产:查看其对Web服务器的影响Apache的遗产:查看其对Web服务器的影响May 03, 2025 am 12:03 AM

ApacheHTTPServer自1995年由志愿者创建以来,对Web服务器领域产生了深远影响。1.它源于对NCSAHTTPd不满,提供更稳定、可靠的服务。2.Apache软件基金会的成立标志其转变为生态系统。3.其模块化设计和安全性提升了Web服务器的灵活性和安全性。4.尽管市场份额下降,Apache仍与现代Web技术紧密联系。5.通过配置优化和缓存,Apache提升了性能。6.错误日志和调试模式帮助解决常见问题。

Apache的目的:服务网络内容Apache的目的:服务网络内容May 02, 2025 am 12:23 AM

ApacheHTTPServer通过模块化设计、虚拟主机功能和性能优化,继续高效地服务于现代互联网环境中的Web内容。1)模块化设计允许添加如URL重写等功能,提升网站SEO性能。2)虚拟主机功能在一个服务器上托管多个网站,节省成本并简化管理。3)通过多线程处理和缓存优化,Apache能处理大量并发连接,提高响应速度和用户体验。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

SublimeText3 英文版

SublimeText3 英文版

推荐:为Win版本,支持代码提示!

螳螂BT

螳螂BT

Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。