Maison >interface Web >js tutoriel >Exploration de l'intercepteur de consommateurs Spring Cloud Stream Kafka Binder

Exploration de l'intercepteur de consommateurs Spring Cloud Stream Kafka Binder

WBOY
WBOYoriginal
2024-08-06 19:20:501104parcourir

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

Introduction

Spring Cloud Stream est un framework qui simplifie le développement de microservices basés sur des messages en faisant abstraction des courtiers de messages tels qu'Apache Kafka et RabbitMQ. L'une des fonctionnalités puissantes de Spring Cloud Stream est sa capacité à s'intégrer de manière transparente à Kafka, permettant aux développeurs de créer des applications événementielles robustes et évolutives. Le classeur Kafka dans Spring Cloud Stream offre un moyen de se connecter facilement aux sujets Kafka.

Dans ce blog, nous verrons comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs de Kafka fournissent un mécanisme permettant d'intercepter et de modifier les enregistrements avant qu'ils ne soient consommés par l'application, offrant ainsi des opportunités de journalisation, de collecte de métriques et de manipulation de données.

Conditions préalables

Avant de plonger dans les détails, assurez-vous d'avoir les prérequis suivants :

  • Kit de développement Java (JDK) 8 ou version ultérieure
  • Apache Kafka
  • Spring Boot 2.x ou version ultérieure
  • Maven ou Gradle

Configuration de l'application Spring Boot

Tout d'abord, configurons un projet Spring Boot simple avec les dépendances nécessaires pour Spring Cloud Stream et Kafka.

Maven pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Gradle build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
    }
}

Configuration de Kafka Binder

Ensuite, configurez le classeur Kafka dans le fichier application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-group
          consumer:
            interceptor-classes: com.example.MyConsumerInterceptor
      kafka:
        binder:
          brokers: localhost:9092

Création d'un intercepteur de consommateur Kafka

Pour créer un intercepteur consommateur, implémentez l'interface ConsumerInterceptor fournie par Kafka. Cette interface vous permet de définir une logique personnalisée pour intercepter et traiter les enregistrements avant qu'ils n'atteignent l'application.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class);

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value());
            // Add your custom logic here
        });
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        // Custom logic on commit
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration logic
    }
}

Création de l'application consommateur

Créez une application grand public simple qui écoute les messages d'un sujet Kafka.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Received message: " + message.getPayload());
    }
}

Interface pour la liaison

Définissez une interface pour lier le canal d'entrée au sujet Kafka.

package com.example;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaProcessor {
    String INPUT = "input";

    @Input(INPUT)
    SubscribableChannel input();
}

Exécution de l'application

  1. Démarrez le courtier Kafka et créez le sujet requis (my-topic).
  2. Exécutez l'application Spring Boot.

Lorsque des messages sont envoyés au sujet Kafka, MyConsumerInterceptor interceptera les enregistrements et vous devriez voir les messages de journal interceptés.

Conclusion

Dans ce blog, nous avons exploré comment utiliser un intercepteur consommateur avec Spring Cloud Stream Kafka Binder. Les intercepteurs constituent un moyen puissant de traiter, enregistrer et manipuler les enregistrements avant qu'ils ne soient consommés par l'application. En intégrant des intercepteurs personnalisés, vous pouvez améliorer les fonctionnalités de vos consommateurs Kafka, en ajoutant des fonctionnalités précieuses telles que la journalisation, la collecte de métriques et la transformation des données.

En suivant les étapes décrites dans ce guide, vous devriez être en mesure d'implémenter et de configurer des intercepteurs grand public dans vos applications Spring Cloud Stream de manière transparente. Bon codage !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn