Maison  >  Article  >  Java  >  Comment utiliser Java pour développer une application de traitement de flux basée sur Apache Kafka et KSQL

Comment utiliser Java pour développer une application de traitement de flux basée sur Apache Kafka et KSQL

WBOY
WBOYoriginal
2023-09-21 08:23:041013parcourir

如何使用Java开发一个基于Apache Kafka和KSQL的流处理应用

Comment développer une application de traitement de flux basée sur Apache Kafka et KSQL en utilisant Java

Le traitement de flux est une technologie qui gère les flux de données en temps réel afin que les données puissent être analysées et traitées dès leur arrivée. Apache Kafka est une plate-forme de traitement de flux distribuée qui peut être utilisée pour créer efficacement des applications de traitement de flux évolutives. KSQL est un moteur de traitement de données de flux open source qui peut être utilisé pour les requêtes SQL et la conversion de données de flux en temps réel. Dans cet article, nous présenterons comment utiliser Java pour développer une application de traitement de flux basée sur Apache Kafka et KSQL.

1. Configuration de l'environnement
Avant de commencer, nous devons configurer un environnement Kafka et KSQL local. Tout d’abord, nous devons télécharger et installer Java JDK, Apache Kafka et Confluent Platform. Nous pouvons ensuite démarrer Kafka et KSQL en utilisant les commandes suivantes :

  1. Démarrer ZooKeeper :
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Démarrer Kafka Broker :
    bin/kafka-server-start.sh config /server.properties
  3. Démarrer le serveur KSQL :
    bin/ksql-server-start.sh config/ksql-server.properties

2 Créer un sujet Kafka et une table KSQL
Avant de commencer à écrire du code Java, nous devons d'abord Créez un sujet Kafka et écrivez-y des données en temps réel. Nous pouvons créer un sujet nommé "exemple-sujet" en utilisant la commande suivante :

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - facteur 1

Ensuite, nous devons créer une table dans KSQL pour interroger et transformer les données en temps réel. Nous pouvons créer une table nommée "exemple-table" dans le terminal KSQL en utilisant la commande suivante :

CREATE TABLE exemple_table (clé VARCHAR, valeur VARCHAR) AVEC (kafka_topic='example-topic', value_format='json', key= 'key');

3. Implémentation du code Java
Avant de commencer à écrire du code Java, nous devons ajouter des dépendances sur Kafka et KSQL. Nous pouvons ajouter les dépendances suivantes dans le fichier de configuration Maven ou Gradle :

Maven:

b4b38e33757a6497aa8690936b905cc1

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>

09a0e22e5aaafd848ae04665be625b91
b4b38e33757a6497aa8690936b905cc1

<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>

09a0e22e5aaafd848ae04665be625b91

Gradle:

implémentation 'org.apache.kafka:kafka-clients:2.5.0'
implémentation 'io.confluent:ksql-serde:0.10.0'

Ensuite, nous pouvons écrire du code Java pour implémenter l'application de traitement de flux. Voici un exemple de code simple :

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
importer org.apache.kafka.connect.json.JsonSerializer;
importer org.apache.kafka.streams.*;
importer org.apache.kafka.streams.kstream . *;
importer org.apache.kafka.streams.processor.WallclockTimestampExtractor;
importer org.apache.kafka.streams.state.*;
importer java.util.*;
importer java.util.concurrent.*;

public class StreamProcessingApp {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    StreamsBuilder builder = new StreamsBuilder();

    // Step 1: Read from Kafka topic
    KStream<String, String> stream = builder.stream("example-topic");

    // Step 2: Transform and process the data
    stream.mapValues(value -> value.toUpperCase())
          .filter((key, value) -> value.startsWith("A"))
          .to("processed-topic");

    // Step 3: Create a Kafka producer to send data to another topic
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Step 4: Consume and process the data from the processed topic
    KStream<String, String> processedStream = builder.stream("processed-topic");
    processedStream.foreach((key, value) -> {
        // Process the data here
        System.out.println("Key: " + key + ", Value: " + value);
    });

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}

}

Le code ci-dessus implémente une application de traitement de flux simple, qui lit les données en temps réel dans le sujet "exemple-sujet", convertit les données en majuscules et les écrit avec la lettre "A". Les données du début sont écrites dans le sujet « sujet traité ». Dans le même temps, il consommera également les données de la rubrique « sujet traité » et les traitera.

4. Exécutez l'application
Après avoir écrit le code Java, nous pouvons utiliser les commandes suivantes pour compiler et exécuter l'application :

javac StreamProcessingApp.java
java StreamProcessingApp

Maintenant, nous avons développé avec succès un flux basé sur Apache Kafka. et des applications de traitement KSQL et la mise en œuvre de la lecture, de la conversion, du traitement et de l'écriture de données via du code Java. Vous pouvez modifier et étendre le code en fonction des besoins réels pour répondre aux besoins de votre entreprise. J'espère que cet article vous aidera !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn