Maison >Java >javaDidacticiel >Créez des applications de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams
Avec l'avènement de l'ère du Big Data, de plus en plus d'entreprises commencent à s'intéresser à la technologie de traitement des flux pour répondre aux besoins de traitement et d'analyse des données en temps réel. Apache Kafka est un système de file d'attente de messages distribué à haut débit et évolutif qui est devenu la norme de facto dans le domaine du traitement de flux. Spring Boot est un outil permettant de développer rapidement des applications Spring, qui peut nous aider à créer des applications de traitement de flux plus rapidement et plus facilement. Cet article expliquera comment créer une application de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams, discutera des avantages et des inconvénients de ces deux outils et comment optimiser les performances des applications.
Avant de commencer à créer l'application, nous devons d'abord créer un sujet Kafka. Dans cet article, nous allons créer une rubrique appelée « clics utilisateur » qui stockera les événements de clics utilisateur sur le site Web.
Exécutez la commande suivante dans la ligne de commande :
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-clicks
Cela créera un sujet nommé "clics utilisateur" sur le serveur Kafka avec une seule partition et fera une copie locale .
Ensuite, nous créerons une application de base à l'aide de Spring Boot. Dans Spring Boot, nous pouvons utiliser Spring Initializr pour créer rapidement une application de base. Lors de la création de votre application, assurez-vous de sélectionner les dépendances suivantes : Après avoir créé l'application, nous ajouterons les dépendances suivantes :
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version> </dependency>
Maintenant, nous pouvons commencer à écrire du code de traitement de flux Kafka. Lors de la création de l'application, nous avons défini une classe de contrôleur appelée "UserController". Nous allons maintenant ajouter un gestionnaire de requêtes POST nommé "clicks" dans la classe du contrôleur. Ce gestionnaire obtiendra les événements de clic de l'utilisateur à partir de la requête POST et les enverra à un sujet Kafka nommé « clics de l'utilisateur ». Le code est le suivant :
@RestController public class UserController { private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public UserController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @PostMapping("/clicks") public void clicks(@RequestBody String click) { kafkaTemplate.send("user-clicks", click); } }
Dans le code ci-dessus, nous utilisons la fonction d'injection de dépendances de Spring pour injecter un objet KafkaTemplate nommé "kafkaTemplate". Cet objet peut être utilisé pour envoyer des messages aux sujets Kafka.
Ensuite, nous allons créer une topologie de traitement de flux Kafka pour le traitement à partir des "clics utilisateur" Événement de clic reçu par le sujet. Dans notre exemple, nous utiliserons l'API Kafka Streams pour implémenter la topologie de traitement de flux.
Dans l'application Spring Boot, nous allons créer une classe appelée "UserClicksStream" qui utilisera l'API Kafka Streams pour gérer les événements de clic. Le code est le suivant :
@Configuration @EnableKafkaStreams public class UserClicksStream { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KStream<String, String> kStream(StreamsBuilder builder) { KStream<String, String> stream = builder.stream("user-clicks"); stream.foreach((key, value) -> { System.out.println("Received: " + value); }); return stream; } @Bean public KafkaStreams kafkaStreams(StreamsBuilder builder) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return new KafkaStreams(builder.build(), props); } }
Dans la méthode kafkaStreams, nous créons une application nommée "user-clicks-stream" et spécifions l'adresse du serveur Kafka. Cette application effectuera automatiquement les opérations de traitement de flux que nous avons définies dans la topologie précédente.
Exécutez l'applicationNous avons maintenant écrit tout le code de l'application. Avant d'exécuter l'application, nous devons démarrer le serveur Kafka.
Exécutez la commande suivante dans la ligne de commande :
bin/kafka-server-start.sh config/server.properties
mvn spring-boot:run
Cela lancera notre application. Nous pouvons désormais envoyer une requête POST à l'application en utilisant n'importe quel client HTTP comme cURL ou Postman. Chaque requête générera un événement de clic et l'imprimera dans la console.
Si nous souhaitons effectuer plus d'opérations dans la topologie (telles que l'agrégation, le calcul de fenêtre, etc.), nous pouvons utiliser d'autres opérations fournies par l'API Kafka Streams pour construire la topologie.
SummaryCréer des applications de traitement de flux à l'aide de Spring Boot et Apache Kafka Streams est un moyen rapide et pratique de nous aider à traiter plus facilement les données réelles -données temporelles. Cependant, nous devons prêter attention à certains problèmes de performances d'optimisation, tels que la conception de la topologie, la taille du tampon, le temps de traitement du flux, etc. En comprenant ces problèmes, nous pouvons mieux créer des applications de traitement de flux efficaces.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!