Maison >Java >javaDidacticiel >Développement Java : comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel
Développement Java : Comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel
Introduction :
Avec l'essor du Big Data et de l'informatique en temps réel, Apache Kafka Streams, en tant que moteur de traitement de flux, est en train d'être utilisé de plus en plus Utilisé par les développeurs. Il fournit un moyen simple mais puissant de gérer des données de streaming en temps réel et d'effectuer des traitements et des calculs de flux complexes. Cet article explique comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel, y compris la configuration de l'environnement, l'écriture de code et des exemples de démonstrations.
1. Préparation :
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>Ajoutez une logique de traitement et de calcul de flux :
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Exécutez les commandes suivantes dans la ligne de commande pour créer des sujets Kafka nommés "sujet d'entrée" et "sujet de sortie" :
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
bonjour : 2
monde : 1可以看到,输出的结果是单词及其对应的计数值:
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!