搜索
首页运维Apache聊聊怎么解析Apache Avro数据(示例讲解)

怎么解析Apache Avro数据?本篇文章给大家介绍一下序列化生成Avro数据、反序列化解析Avro数据、使用FlinkSQL解析Avro数据的方法,希望对大家有所帮助!

聊聊怎么解析Apache Avro数据(示例讲解)

随着互联网高速的发展,云计算、大数据、人工智能AI、物联网等前沿技术已然成为当今时代主流的高新技术,诸如电商网站、人脸识别、无人驾驶、智能家居、智慧城市等等,不仅方面方便了人们的衣食住行,背后更是时时刻刻有大量的数据在经过各种各样的系统平台的采集、清晰、分析,而保证数据的低时延、高吞吐、安全性就显得尤为重要,Apache Avro本身通过Schema的方式序列化后进行二进制传输,一方面保证了数据的高速传输,另一方面保证了数据安全性,avro当前在各个行业的应用越来越广泛,如何对avro数据进行处理解析应用就格外重要,本文将演示如果序列化生成avro数据,并使用FlinkSQL进行解析。

本文是avro解析的demo,当前FlinkSQL仅适用于简单的avro数据解析,复杂嵌套avro数据暂时不支持。

场景介绍

本文主要介绍以下三个重点内容:

  • 如何序列化生成Avro数据

  • 如何反序列化解析Avro数据

  • 如何使用FlinkSQL解析Avro数据

前提条件

  • 了解avro是什么,可参考apache avro官网快速入门指南

  • 了解avro应用场景

操作步骤

1、新建avro maven工程项目,配置pom依赖

1.png

pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huawei.bigdata</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

注意:以上pom文件配置了自动生成类的路径,即project.basedir/src/main/avro/{project.basedir}/src/main/avro/和{project.basedir}/src/main/java/,这样配置之后,在执行mvn命令的时候,这个插件就会自动将此目录下的avsc schema生成类文件,并放到后者这个目录下。如果没有生成avro目录,手动创建一下即可。

2、定义schema

使用JSON为Avro定义schema。schema由基本类型(null,boolean, int, long, float, double, bytes 和string)和复杂类型(record, enum, array, map, union, 和fixed)组成。例如,以下定义一个user的schema,在main目录下创建一个avro目录,然后在avro目录下新建文件 user.avsc :

{"namespace": "lancoo.ecbdc.pre",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

2.png

3、编译schema

点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码

3.png

4、序列化

创建TestUser类,用于序列化生成数据

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null

// Alternate constructor
User user2 = new User("Ben", 7, "red");

// Construct via builder
User user3 = User.newBuilder()
        .setName("Charlie")
        .setFavoriteColor("blue")
        .setFavoriteNumber(null)
        .build();

// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

执行序列化程序后,会在项目的同级目录下生成avro数据

4.png

user_generic.avro内容如下:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

至此avro数据已经生成。

5、反序列化

通过反序列化代码解析avro数据

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
}

执行反序列化代码解析user_generic.avro

5.png

avro数据解析成功。

6、将user_generic.avro上传至hdfs路径

hdfs dfs -mkdir -p /tmp/lztest/

hdfs dfs -put user_generic.avro /tmp/lztest/

6.png

7、配置flinkserver

  • 准备avro jar包

将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib

chmod 500 flink-sql-avro*.jar

chown omm:wheel flink-sql-avro*.jar

7.png

  • 同时重启FlinkServer实例,重启完成后查看avro包是否被上传

    hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8.png

8、编写FlinkSQL

CREATE TABLE testHdfs(
  name String,
  favorite_number int,
  favorite_color String
) WITH(
  &#39;connector&#39; = &#39;filesystem&#39;,
  &#39;path&#39; = &#39;hdfs:///tmp/lztest/user_generic.avro&#39;,
  &#39;format&#39; = &#39;avro&#39;
);CREATE TABLE KafkaTable (
  name String,
  favorite_number int,
  favorite_color String
) WITH (
  &#39;connector&#39; = &#39;kafka&#39;,
  &#39;topic&#39; = &#39;testavro&#39;,
  &#39;properties.bootstrap.servers&#39; = &#39;96.10.2.1:21005&#39;,
  &#39;properties.group.id&#39; = &#39;testGroup&#39;,
  &#39;scan.startup.mode&#39; = &#39;latest-offset&#39;,
  &#39;format&#39; = &#39;avro&#39;
);
insert into
  KafkaTable
select
  *
from
  testHdfs;

9.png

保存提交任务

9、查看对应topic中是否有数据

10.png

FlinkSQL解析avro数据成功。

【推荐:Apache使用教程

以上是聊聊怎么解析Apache Avro数据(示例讲解)的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:掘金社区。如有侵权,请联系admin@php.cn删除
Apache:它仍然使用吗?查看Web服务器趋势Apache:它仍然使用吗?查看Web服务器趋势Apr 24, 2025 am 12:17 AM

Apache仍在被广泛使用,但市场份额已从2010年的50%以上降至2023年的不到30%。它的优势在于稳定性和可靠性,适合需要这些特性的企业级应用;劣势是多进程模型在高并发下资源消耗大,Nginx在高并发处理上表现更佳。

Apache Web服务器:核心功能解释了Apache Web服务器:核心功能解释了Apr 23, 2025 am 12:12 AM

ApacheWebServer的核心功能包括模块化设计、虚拟主机配置、安全性设置和性能优化。1)模块化设计通过加载不同模块实现灵活扩展,如mod_rewrite用于URL重写。2)虚拟主机配置允许在一台服务器上运行多个网站。3)安全性设置提供SSL/TLS加密和访问控制。4)性能优化涉及启用KeepAlive、调整MPM配置和启用缓存。

阿帕奇(Apache)的持续重要性:寿命的原因阿帕奇(Apache)的持续重要性:寿命的原因Apr 22, 2025 am 12:08 AM

Apache持续重要性的原因包括其多样性、灵活性、强大的社区支持、在企业级应用中的广泛使用和高可靠性,以及在新兴技术领域的不断创新。具体来说,1)Apache项目涵盖了从Web服务器到大数据处理等多个领域,提供了丰富的解决方案;2)Apache软件基金会(ASF)的全球性社区为项目提供了持续的支持和发展动力;3)Apache在金融、电信等企业级应用中表现出高稳定性和可扩展性;4)Apache在云计算、大数据等新兴技术领域持续创新,如ApacheFlink和ApacheArrow的突破。

超越炒作:评估阿帕奇的当前角色超越炒作:评估阿帕奇的当前角色Apr 21, 2025 am 12:14 AM

Apache在当今技术生态中依然重要。1)在Web服务和大数据处理领域,ApacheHTTPServer、Kafka和Hadoop仍是首选。2)未来需关注云原生化、性能优化和生态系统简化,以保持竞争力。

Apache的影响:网络托管和内容交付Apache的影响:网络托管和内容交付Apr 20, 2025 am 12:12 AM

ApacheHTTPServer对WebHosting和内容分发有巨大影响。 1)Apache始于1995年,迅速成为市场首选,提供模块化设计和灵活性。 2)在Web托管中,Apache因稳定性和安全性被广泛使用,支持多操作系统。 3)内容分发方面,结合CDN使用提高了网站速度和可靠性。 4)通过性能优化配置,如内容压缩和缓存头,Apache显着提升网站性能。

Apache的角色:服务HTML,CSS,JavaScript等Apache的角色:服务HTML,CSS,JavaScript等Apr 19, 2025 am 12:09 AM

Apache可以服务HTML、CSS、JavaScript等文件。1)配置虚拟主机和文档根目录,2)接收、处理并返回请求,3)使用.htaccess文件实现URL重写,4)通过检查权限、查看日志和测试配置来调试,5)启用缓存、压缩文件和调整KeepAlive设置来优化性能。

Apache以:关键功能和成就而闻名Apache以:关键功能和成就而闻名Apr 18, 2025 am 12:03 AM

ApacheHTTPServer因其模块化设计、高度可扩展性、安全性和性能优化成为Web服务器领域的领导者。1.模块化设计通过加载不同模块支持各种协议和功能。2.高度可扩展性适应从小型到大型应用的需求。3.安全性通过mod_security和多种认证机制保护网站。4.性能优化通过数据压缩和缓存提升加载速度。

Apache的持久相关性:检查其当前状态Apache的持久相关性:检查其当前状态Apr 17, 2025 am 12:06 AM

ApacheHTTPServer在现代Web环境中依然重要,因为其稳定性、可扩展性和丰富的生态系统。1)稳定性和可靠性使其适用于高可用性环境。2)广泛的生态系统提供丰富的模块和扩展。3)易于配置和管理,即使初学者也能快速上手。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中