Maison  >  Article  >  Opération et maintenance  >  Parlons de la façon d'analyser les données Apache Avro (explication avec exemples)

Parlons de la façon d'analyser les données Apache Avro (explication avec exemples)

青灯夜游
青灯夜游avant
2022-02-22 10:47:163280parcourir

Comment analyser les données Apache Avro ? Cet article vous présentera les méthodes de sérialisation pour générer des données Avro, de désérialisation pour analyser les données Avro et d'utilisation de FlinkSQL pour analyser les données Avro. J'espère que cela vous sera utile !

Parlons de la façon d'analyser les données Apache Avro (explication avec exemples)

Avec le développement rapide d'Internet, les technologies de pointe telles que le cloud computing, le big data, l'intelligence artificielle et l'Internet des objets sont devenues des technologies de haute technologie courantes à l'ère d'aujourd'hui, comme les sites Web de commerce électronique. , la reconnaissance faciale, les voitures sans conducteur et les téléphones intelligents. Les maisons, les villes intelligentes, etc. facilitent non seulement l'alimentation, l'habillement, le logement et le transport des gens, mais derrière eux se trouve une grande quantité de données qui sont collectées, effacées et analysées par diverses plates-formes système à tout moment, garantissant une faible latence et une qualité de données élevée. Le débit et la sécurité sont particulièrement importants. Apache Avro lui-même est sérialisé via Schema pour la transmission binaire, d'une part, et d'autre part. d'autre part, il garantit la sécurité des données. Avro est actuellement de plus en plus utilisé dans diverses industries, plus il est étendu, plus il est important de traiter et d'analyser les données avro. Cet article montrera comment générer des données avro via la sérialisation et. utilisez FlinkSQL pour l'analyse.

Cet article est une démonstration de l'analyse avro. Actuellement, FlinkSQL ne convient qu'à l'analyse de données avro simples. Les données avro imbriquées complexes ne sont pas prises en charge pour le moment.

Introduction à la scène

Cet article présente principalement les trois contenus clés suivants :

  • Comment sérialiser et générer des données Avro

  • Comment désérialiser et analyser les données Avro

  • Comment utiliser FlinkSQL pour analyser les données Avro

Prérequis

  • Pour comprendre ce qu'est avro, vous pouvez vous référer au guide de démarrage rapide du site officiel d'Apache avro

  • Comprendre les scénarios d'application avro

Étapes de fonctionnement

1. Créer un nouveau maven avro. projetez et configurez les dépendances pom

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

Le contenu du fichier pom est le suivant :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huawei.bigdata</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Remarque : Le fichier pom ci-dessus est configuré avec le chemin d'accès à la classe générée automatiquement, c'est-à-dire projec t mai n /avr o/ et {project.basedir}/src/main/avro/ et p roj ec t.b

{"namespace": "lancoo.ecbdc.pre",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
3. Compilez le schéma.

点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

4、序列化

创建TestUser类,用于序列化生成数据

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null

// Alternate constructor
User user2 = new User("Ben", 7, "red");

// Construct via builder
User user3 = User.newBuilder()
        .setName("Charlie")
        .setFavoriteColor("blue")
        .setFavoriteNumber(null)
        .build();

// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

执行序列化程序后,会在项目的同级目录下生成avro数据

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

user_generic.avro内容如下:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

至此avro数据已经生成。

5、反序列化

通过反序列化代码解析avro数据

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
}

执行反序列化代码解析user_generic.avro

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

avro数据解析成功。

6、将user_generic.avro上传至hdfs路径

hdfs dfs -mkdir -p /tmp/lztest/

hdfs dfs -put user_generic.avro /tmp/lztest/

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

7、配置flinkserver

  • 准备avro jar包

将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib

chmod 500 flink-sql-avro*.jar

chown omm:wheel flink-sql-avro*.jar

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

  • 同时重启FlinkServer实例,重启完成后查看avro包是否被上传

    hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

8、编写FlinkSQL

CREATE TABLE testHdfs(
  name String,
  favorite_number int,
  favorite_color String
) WITH(
  &#39;connector&#39; = &#39;filesystem&#39;,
  &#39;path&#39; = &#39;hdfs:///tmp/lztest/user_generic.avro&#39;,
  &#39;format&#39; = &#39;avro&#39;
);CREATE TABLE KafkaTable (
  name String,
  favorite_number int,
  favorite_color String
) WITH (
  &#39;connector&#39; = &#39;kafka&#39;,
  &#39;topic&#39; = &#39;testavro&#39;,
  &#39;properties.bootstrap.servers&#39; = &#39;96.10.2.1:21005&#39;,
  &#39;properties.group.id&#39; = &#39;testGroup&#39;,
  &#39;scan.startup.mode&#39; = &#39;latest-offset&#39;,
  &#39;format&#39; = &#39;avro&#39;
);
insert into
  KafkaTable
select
  *
from
  testHdfs;

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

保存提交任务

9、查看对应topic中是否有数据

Parlons de la façon danalyser les données Apache Avro (explication avec exemples)

FlinkSQL解析avro数据成功。

【推荐:Apache使用教程

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer