Heim  >  Artikel  >  Web-Frontend  >  Erkundung des Spring Cloud Stream Kafka Binder Consumer Interceptor

Erkundung des Spring Cloud Stream Kafka Binder Consumer Interceptor

WBOY
WBOYOriginal
2024-08-06 19:20:501039Durchsuche

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

Einführung

Spring Cloud Stream ist ein Framework, das die Entwicklung nachrichtengesteuerter Mikrodienste durch die Abstraktion von Nachrichtenbrokern wie Apache Kafka und RabbitMQ vereinfacht. Eine der leistungsstarken Funktionen von Spring Cloud Stream ist die Fähigkeit zur nahtlosen Integration mit Kafka, sodass Entwickler robuste und skalierbare ereignisgesteuerte Anwendungen erstellen können. Der Kafka-Ordner in Spring Cloud Stream bietet eine Möglichkeit, einfach eine Verbindung zu Kafka-Themen herzustellen.

In diesem Blog befassen wir uns mit der Verwendung eines Consumer-Interceptors mit Spring Cloud Stream Kafka Binder. Interceptors in Kafka bieten einen Mechanismus zum Abfangen und Ändern von Datensätzen, bevor sie von der Anwendung verwendet werden, und bieten Möglichkeiten zur Protokollierung, Metrikerfassung und Datenmanipulation.

Voraussetzungen

Bevor Sie in die Details eintauchen, stellen Sie sicher, dass Sie die folgenden Voraussetzungen erfüllen:

  • Java Development Kit (JDK) 8 oder höher
  • Apache Kafka
  • Spring Boot 2.x oder höher
  • Maven oder Gradle

Einrichten der Spring Boot-Anwendung

Zuerst richten wir ein einfaches Spring Boot-Projekt mit den notwendigen Abhängigkeiten für Spring Cloud Stream und Kafka ein.

Maven pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Gradle build.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
    }
}

Kafka Binder konfigurieren

Als nächstes konfigurieren Sie den Kafka-Ordner in der Datei application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-group
          consumer:
            interceptor-classes: com.example.MyConsumerInterceptor
      kafka:
        binder:
          brokers: localhost:9092

Erstellen eines Kafka Consumer Interceptors

Um einen Consumer-Interceptor zu erstellen, implementieren Sie die von Kafka bereitgestellte ConsumerInterceptor-Schnittstelle. Mit dieser Schnittstelle können Sie eine benutzerdefinierte Logik zum Abfangen und Verarbeiten von Datensätzen definieren, bevor sie die Anwendung erreichen.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class);

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value());
            // Add your custom logic here
        });
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        // Custom logic on commit
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration logic
    }
}

Erstellen der Verbraucheranwendung

Erstellen Sie eine einfache Verbraucheranwendung, die Nachrichten aus einem Kafka-Thema abhört.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Received message: " + message.getPayload());
    }
}

Schnittstelle für Bindung

Definieren Sie eine Schnittstelle zum Binden des Eingabekanals an das Kafka-Thema.

package com.example;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaProcessor {
    String INPUT = "input";

    @Input(INPUT)
    SubscribableChannel input();
}

Ausführen der Anwendung

  1. Starten Sie den Kafka-Broker und erstellen Sie das erforderliche Thema (Mein-Thema).
  2. Führen Sie die Spring Boot-Anwendung aus.

Wenn Nachrichten zum Kafka-Thema erstellt werden, fängt der MyConsumerInterceptor die Datensätze ab und Sie sollten die abgefangenen Protokollnachrichten sehen.

Abschluss

In diesem Blog haben wir untersucht, wie man einen Consumer-Interceptor mit Spring Cloud Stream Kafka Binder verwendet. Interceptors bieten eine leistungsstarke Möglichkeit, Datensätze zu verarbeiten, zu protokollieren und zu manipulieren, bevor sie von der Anwendung verwendet werden. Durch die Integration benutzerdefinierter Interceptoren können Sie die Funktionalität Ihrer Kafka-Konsumenten verbessern und wertvolle Funktionen wie Protokollierung, Metrikerfassung und Datentransformation hinzufügen.

Wenn Sie die in diesem Leitfaden beschriebenen Schritte befolgen, sollten Sie in der Lage sein, Consumer-Interceptors nahtlos in Ihren Spring Cloud Stream-Anwendungen zu implementieren und zu konfigurieren. Viel Spaß beim Codieren!

Das obige ist der detaillierte Inhalt vonErkundung des Spring Cloud Stream Kafka Binder Consumer Interceptor. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn