So entwickeln Sie eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL mit Java
Stream-Verarbeitung ist eine Technologie, die Echtzeit-Datenströme verarbeitet, sodass die Daten analysiert und verarbeitet werden können, sobald sie ankommen. Apache Kafka ist eine verteilte Stream-Verarbeitungsplattform, mit der sich effizient skalierbare Stream-Verarbeitungsanwendungen erstellen lassen. KSQL ist eine Open-Source-Stream-Datenverarbeitungs-Engine, die für SQL-Abfragen und die Konvertierung von Echtzeit-Stream-Daten verwendet werden kann. In diesem Artikel stellen wir vor, wie Sie mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickeln.
1. Umgebungseinrichtung
Bevor wir beginnen, müssen wir eine lokale Kafka- und KSQL-Umgebung einrichten. Zuerst müssen wir Java JDK, Apache Kafka und Confluent Platform herunterladen und installieren. Anschließend können wir Kafka und KSQL mit den folgenden Befehlen starten:
2. Erstellen Sie ein Kafka-Thema und eine KSQL-Tabelle
Bevor wir mit dem Schreiben von Java-Code beginnen, müssen wir zunächst Folgendes tun Erstellen Sie ein Kafka-Thema und schreiben Sie Echtzeitdaten hinein. Mit dem folgenden Befehl können wir ein Thema namens „example-topic“ erstellen:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - Faktor 1
Als nächstes müssen wir in KSQL eine Tabelle zum Abfragen und Transformieren von Echtzeitdaten erstellen. Mit dem folgenden Befehl können wir im KSQL-Terminal eine Tabelle mit dem Namen „example-table“ erstellen:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'key');
3. Java-Code-Implementierung
Bevor wir mit dem Schreiben von Java-Code beginnen, müssen wir Abhängigkeiten von Kafka und KSQL hinzufügen. Wir können die folgenden Abhängigkeiten in der Maven- oder Gradle-Konfigurationsdatei hinzufügen:
Maven:
b4b38e33757a6497aa8690936b905cc1
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
09a0e22e5aaafd848ae04665be625b91
b4b38e33757a6497aa8690936b905cc1
<groupId>io.confluent</groupId> <artifactId>ksql-serde</artifactId> <version>0.10.0</version>
09a0e22e5aaafd848ae04665be625b91
Gradle:
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'io.confluent:ksql-serde:0.10.0'
Als nächstes können wir Java-Code schreiben, um die Stream-Verarbeitungsanwendung zu implementieren. Hier ist ein einfacher Beispielcode:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;
öffentliche Klasse StreamProcessingApp {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
}
Der obige Code implementiert eine einfache Stream-Verarbeitungsanwendung, die Echtzeitdaten im Thema „Beispielthema“ liest, die Daten in Großbuchstaben umwandelt und sie mit dem Buchstaben „A“ schreibt. Die Daten am Anfang werden in das Thema „processed-topic“ geschrieben. Gleichzeitig werden auch die Daten im Thema „verarbeitetes Thema“ verbraucht und verarbeitet.
4. Führen Sie die Anwendung aus
Nachdem wir den Java-Code geschrieben haben, können wir die folgenden Befehle verwenden, um die Anwendung zu kompilieren und auszuführen:
javac StreamProcessingApp.java
java StreamProcessingApp
Jetzt haben wir erfolgreich einen Stream basierend auf Apache Kafka entwickelt und KSQL-Verarbeitungsanwendungen und Implementierung des Lesens, Konvertierens, Verarbeitens und Schreibens von Daten durch Java-Code. Sie können den Code entsprechend den tatsächlichen Anforderungen ändern und erweitern, um Ihren Geschäftsanforderungen gerecht zu werden. Ich hoffe, dieser Artikel ist hilfreich für Sie!
Das obige ist der detaillierte Inhalt vonWie man mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickelt. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!