Maison  >  Article  >  Java  >  Créez des applications de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams

Créez des applications de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams

WBOY
WBOYoriginal
2023-06-23 08:32:221523parcourir

Avec l'avènement de l'ère du Big Data, de plus en plus d'entreprises commencent à s'intéresser à la technologie de traitement des flux pour répondre aux besoins de traitement et d'analyse des données en temps réel. Apache Kafka est un système de file d'attente de messages distribué à haut débit et évolutif qui est devenu la norme de facto dans le domaine du traitement de flux. Spring Boot est un outil permettant de développer rapidement des applications Spring, qui peut nous aider à créer des applications de traitement de flux plus rapidement et plus facilement. Cet article expliquera comment créer une application de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams, discutera des avantages et des inconvénients de ces deux outils et comment optimiser les performances des applications.

  1. Créer un sujet Kafka

Avant de commencer à créer l'application, nous devons d'abord créer un sujet Kafka. Dans cet article, nous allons créer une rubrique appelée « clics utilisateur » qui stockera les événements de clics utilisateur sur le site Web.

Exécutez la commande suivante dans la ligne de commande :

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-clicks

Cela créera un sujet nommé "clics utilisateur" sur le serveur Kafka avec une seule partition et fera une copie locale .

  1. Création d'une application Spring Boot

Ensuite, nous créerons une application de base à l'aide de Spring Boot. Dans Spring Boot, nous pouvons utiliser Spring Initializr pour créer rapidement une application de base. Lors de la création de votre application, assurez-vous de sélectionner les dépendances suivantes : Après avoir créé l'application, nous ajouterons les dépendances suivantes :

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>2.6.0</version>
</dependency>
    Cela nous fournira l'API de traitement de flux Kafka.
  • Implémentation du traitement de flux Kafka

Maintenant, nous pouvons commencer à écrire du code de traitement de flux Kafka. Lors de la création de l'application, nous avons défini une classe de contrôleur appelée "UserController". Nous allons maintenant ajouter un gestionnaire de requêtes POST nommé "clicks" dans la classe du contrôleur. Ce gestionnaire obtiendra les événements de clic de l'utilisateur à partir de la requête POST et les enverra à un sujet Kafka nommé « clics de l'utilisateur ». Le code est le suivant :

@RestController
public class UserController {

   private final KafkaTemplate<String, String> kafkaTemplate;

   @Autowired
   public UserController(KafkaTemplate<String, String> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   @PostMapping("/clicks")
   public void clicks(@RequestBody String click) {
       kafkaTemplate.send("user-clicks", click);
   }
}

Dans le code ci-dessus, nous utilisons la fonction d'injection de dépendances de Spring pour injecter un objet KafkaTemplate nommé "kafkaTemplate". Cet objet peut être utilisé pour envoyer des messages aux sujets Kafka.

  1. Créer une topologie de traitement de flux Kafka

Ensuite, nous allons créer une topologie de traitement de flux Kafka pour le traitement à partir des "clics utilisateur" Événement de clic reçu par le sujet. Dans notre exemple, nous utiliserons l'API Kafka Streams pour implémenter la topologie de traitement de flux.

Dans l'application Spring Boot, nous allons créer une classe appelée "UserClicksStream" qui utilisera l'API Kafka Streams pour gérer les événements de clic. Le code est le suivant :

@Configuration
@EnableKafkaStreams
public class UserClicksStream {

   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;

   @Bean
   public KStream<String, String> kStream(StreamsBuilder builder) {

       KStream<String, String> stream = builder.stream("user-clicks");

       stream.foreach((key, value) -> {
           System.out.println("Received: " + value);
       });

       return stream;
   }

   @Bean
   public KafkaStreams kafkaStreams(StreamsBuilder builder) {
       Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-stream");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       return new KafkaStreams(builder.build(), props);
   }
}
    Dans le code ci-dessus, nous utilisons la fonction d'injection de dépendances de Spring pour injecter un objet StreamsBuilder nommé "StreamsBuilder". Cet objet est utilisé pour créer une topologie de traitement de flux Kafka.
  1. Dans la méthode kStream, nous créons un objet KStream à partir du sujet "user-clicks" et imprimons les événements reçus en utilisant la méthode foreach. froeach est une opération de terminal que nous utiliserons dans les étapes ultérieures.

Dans la méthode kafkaStreams, nous créons une application nommée "user-clicks-stream" et spécifions l'adresse du serveur Kafka. Cette application effectuera automatiquement les opérations de traitement de flux que nous avons définies dans la topologie précédente.

Exécutez l'application

Nous avons maintenant écrit tout le code de l'application. Avant d'exécuter l'application, nous devons démarrer le serveur Kafka.

Exécutez la commande suivante dans la ligne de commande :

bin/kafka-server-start.sh config/server.properties
    Cela démarrera le serveur Kafka. Nous pouvons maintenant démarrer notre application.
  1. Exécutez la commande suivante dans la ligne de commande :
mvn spring-boot:run

Cela lancera notre application. Nous pouvons désormais envoyer une requête POST à ​​l'application en utilisant n'importe quel client HTTP comme cURL ou Postman. Chaque requête générera un événement de clic et l'imprimera dans la console.

Si nous souhaitons effectuer plus d'opérations dans la topologie (telles que l'agrégation, le calcul de fenêtre, etc.), nous pouvons utiliser d'autres opérations fournies par l'API Kafka Streams pour construire la topologie.

Summary

Créer des applications de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams est un moyen rapide et pratique de nous aider à traiter plus facilement les données réelles -données temporelles. Cependant, nous devons prêter attention à certains problèmes de performances d'optimisation, tels que la conception de la topologie, la taille du tampon, le temps de traitement du flux, etc. En comprenant ces problèmes, nous pouvons mieux créer des applications de traitement de flux efficaces.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn