Maison >Java >javaDidacticiel >Comment utiliser Java pour développer une application de traitement de flux basée sur Apache Kafka et KSQL
Comment développer une application de traitement de flux basée sur Apache Kafka et KSQL en utilisant Java
Le traitement de flux est une technologie qui gère les flux de données en temps réel afin que les données puissent être analysées et traitées dès leur arrivée. Apache Kafka est une plate-forme de traitement de flux distribuée qui peut être utilisée pour créer efficacement des applications de traitement de flux évolutives. KSQL est un moteur de traitement de données de flux open source qui peut être utilisé pour les requêtes SQL et la conversion de données de flux en temps réel. Dans cet article, nous présenterons comment utiliser Java pour développer une application de traitement de flux basée sur Apache Kafka et KSQL.
1. Configuration de l'environnement
Avant de commencer, nous devons configurer un environnement Kafka et KSQL local. Tout d’abord, nous devons télécharger et installer Java JDK, Apache Kafka et Confluent Platform. Nous pouvons ensuite démarrer Kafka et KSQL en utilisant les commandes suivantes :
2 Créer un sujet Kafka et une table KSQL
Avant de commencer à écrire du code Java, nous devons d'abord Créez un sujet Kafka et écrivez-y des données en temps réel. Nous pouvons créer un sujet nommé "exemple-sujet" en utilisant la commande suivante :
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - facteur 1
Ensuite, nous devons créer une table dans KSQL pour interroger et transformer les données en temps réel. Nous pouvons créer une table nommée "exemple-table" dans le terminal KSQL en utilisant la commande suivante :
CREATE TABLE exemple_table (clé VARCHAR, valeur VARCHAR) AVEC (kafka_topic='example-topic', value_format='json', key= 'key');
3. Implémentation du code Java
Avant de commencer à écrire du code Java, nous devons ajouter des dépendances sur Kafka et KSQL. Nous pouvons ajouter les dépendances suivantes dans le fichier de configuration Maven ou Gradle :
Maven:
b4b38e33757a6497aa8690936b905cc1
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
09a0e22e5aaafd848ae04665be625b91
b4b38e33757a6497aa8690936b905cc1
<groupId>io.confluent</groupId> <artifactId>ksql-serde</artifactId> <version>0.10.0</version>
09a0e22e5aaafd848ae04665be625b91
Gradle:
implémentation 'org.apache.kafka:kafka-clients:2.5.0'
implémentation 'io.confluent:ksql-serde:0.10.0'
Ensuite, nous pouvons écrire du code Java pour implémenter l'application de traitement de flux. Voici un exemple de code simple :
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
importer org.apache.kafka.connect.json.JsonSerializer;
importer org.apache.kafka.streams.*;
importer org.apache.kafka.streams.kstream . *;
importer org.apache.kafka.streams.processor.WallclockTimestampExtractor;
importer org.apache.kafka.streams.state.*;
importer java.util.*;
importer java.util.concurrent.*;
public class StreamProcessingApp {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
}
Le code ci-dessus implémente une application de traitement de flux simple, qui lit les données en temps réel dans le sujet "exemple-sujet", convertit les données en majuscules et les écrit avec la lettre "A". Les données du début sont écrites dans le sujet « sujet traité ». Dans le même temps, il consommera également les données de la rubrique « sujet traité » et les traitera.
4. Exécutez l'application
Après avoir écrit le code Java, nous pouvons utiliser les commandes suivantes pour compiler et exécuter l'application :
javac StreamProcessingApp.java
java StreamProcessingApp
Maintenant, nous avons développé avec succès un flux basé sur Apache Kafka. et des applications de traitement KSQL et la mise en œuvre de la lecture, de la conversion, du traitement et de l'écriture de données via du code Java. Vous pouvez modifier et étendre le code en fonction des besoins réels pour répondre aux besoins de votre entreprise. J'espère que cet article vous aidera !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!