Heim >Java >javaLernprogramm >Java-Entwicklung: So verwenden Sie Apache Kafka Streams für die Echtzeit-Stream-Verarbeitung und -Berechnung
Java-Entwicklung: So verwenden Sie Apache Kafka Streams für die Echtzeit-Stream-Verarbeitung und -Berechnung
Einführung:
Mit dem Aufkommen von Big Data und Echtzeit-Computing wird Apache Kafka Streams als Stream-Verarbeitungs-Engine immer beliebter Wird immer häufiger von Entwicklern verwendet. Es bietet eine einfache, aber leistungsstarke Möglichkeit, Echtzeit-Streaming-Daten zu verarbeiten und komplexe Stream-Verarbeitungen und Berechnungen durchzuführen. In diesem Artikel wird die Verwendung von Apache Kafka Streams für die Echtzeit-Stream-Verarbeitung und -Berechnung vorgestellt, einschließlich der Konfiguration der Umgebung, des Schreibens von Code und Beispieldemonstrationen.
1. Vorbereitung:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>Fügen Sie Stream-Verarbeitungs- und Berechnungslogik hinzu:
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Führen Sie die folgenden Befehle in der Befehlszeile aus, um Kafka-Themen mit den Namen „Eingabe-Thema“ und „Ausgabe-Thema“ zu erstellen:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
hello: 2
world: 1可以看到,输出的结果是单词及其对应的计数值:
Das obige ist der detaillierte Inhalt vonJava-Entwicklung: So verwenden Sie Apache Kafka Streams für die Echtzeit-Stream-Verarbeitung und -Berechnung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!