Maison >Java >javaDidacticiel >Développement Java : comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel

Développement Java : comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel

WBOY
WBOYoriginal
2023-09-21 12:39:241518parcourir

Java开发:如何使用Apache Kafka Streams进行实时流处理和计算

Développement Java : Comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel

Introduction :
Avec l'essor du Big Data et de l'informatique en temps réel, Apache Kafka Streams, en tant que moteur de traitement de flux, est en train d'être utilisé de plus en plus Utilisé par les développeurs. Il fournit un moyen simple mais puissant de gérer des données de streaming en temps réel et d'effectuer des traitements et des calculs de flux complexes. Cet article explique comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel, y compris la configuration de l'environnement, l'écriture de code et des exemples de démonstrations.

1. Préparation :

  1. Installer et configurer Apache Kafka : Vous devez télécharger et installer Apache Kafka et démarrer le cluster Apache Kafka. Pour une installation et une configuration détaillées, veuillez vous référer à la documentation officielle d'Apache Kafka.
  2. Introduire des dépendances : introduisez les dépendances liées à Kafka Streams dans le projet Java. Par exemple, en utilisant Maven, vous pouvez ajouter les dépendances suivantes dans les informations de connexion au cluster pom du projet. Voici un exemple de code simple :
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.1</version>
</dependency>

Ajoutez une logique de traitement et de calcul de flux :
    Après avoir créé une application Kafka Streams, vous devez ajouter une logique de traitement et de calcul de flux spécifique. Prenant un exemple simple, nous supposons que nous recevons un message sous forme de chaîne d'un sujet Kafka nommé "input-topic", effectuons un calcul de longueur sur le message, puis envoyons le résultat à un sujet Kafka nommé "output-topic" . Voici un exemple de code :

  1. import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    
    import java.util.Properties;
    
    public class KafkaStreamsApp {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            StreamsBuilder builder = new StreamsBuilder();
            // 在这里添加流处理和计算逻辑
    
            Topology topology = builder.build();
            KafkaStreams streams = new KafkaStreams(topology, props);
            streams.start();
    
            // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }
    }
  2. Dans l'exemple de code ci-dessus, un objet KStream est d'abord créé à partir du sujet d'entrée, puis chaque message est divisé en mots à l'aide de l'opération flatMapValues ​​​​et compté statistiquement. Enfin, les résultats sont envoyés au sujet de sortie.
    3. Exemple de démonstration :
  1. Afin de vérifier nos applications de traitement de flux et de calcul en temps réel, vous pouvez utiliser l'outil de ligne de commande Kafka pour envoyer des messages et afficher les résultats. Voici les étapes pour un exemple de démonstration :
Créer des sujets d'entrée et de sortie :

Exécutez les commandes suivantes dans la ligne de commande pour créer des sujets Kafka nommés "sujet d'entrée" et "sujet de sortie" :

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

public class KafkaStreamsApp {

    // 省略其他代码...
    
    public static void main(String[] args) {
        // 省略其他代码...
        
        KStream<String, String> inputStream = builder.stream("input-topic");
        KTable<String, Long> wordCounts = inputStream
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, word) -> word)
                .count();

        wordCounts.toStream().to("output-topic");

        // 省略其他代码...
    }
}

Envoyer un message Vers le sujet d'entrée :
    Exécutez la commande suivante dans la ligne de commande pour envoyer des messages à "input-topic" :

  1. bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  2. bin/kafka-console-consumer.sh --topic output-topic --from- début --bootstrap -server localhost : 9092
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
>hello world
>apache kafka streams
>real-time processing
>```

3. 查看结果:
在命令行中执行以下命令,从"output-topic"中消费结果消息:
    temps réel : 1
  1. traitement : 1
    apache : 1
  2. kafka : 1
streams : 1

bonjour : 2

monde : 1

可以看到,输出的结果是单词及其对应的计数值:

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn