ホームページ  >  記事  >  ウェブフロントエンド  >  Spring Cloud Stream Kafka Binder Consumer Interceptor の探索

Spring Cloud Stream Kafka Binder Consumer Interceptor の探索

WBOY
WBOYオリジナル
2024-08-06 19:20:501039ブラウズ

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

導入

Spring Cloud Stream は、Apache Kafka や RabbitMQ などのメッセージ ブローカーを抽象化することで、メッセージ駆動型のマイクロサービスの開発を簡素化するフレームワークです。 Spring Cloud Stream の強力な機能の 1 つは、Kafka とシームレスに統合できることで、開発者は堅牢でスケーラブルなイベント駆動型アプリケーションを構築できます。 Spring Cloud Stream の Kafka バインダーは、Kafka トピックに簡単に接続する方法を提供します。

このブログでは、Spring Cloud Stream Kafka Binder でコンシューマ インターセプタを使用する方法を詳しく説明します。 Kafka のインターセプターは、アプリケーションによって使用される前にレコードをインターセプトして変更するメカニズムを提供し、ロギング、メトリクス収集、データ操作の機会を提供します。

前提条件

詳細に入る前に、次の前提条件を満たしていることを確認してください。

  • Java 開発キット (JDK) 8 以降
  • Apache Kafka
  • Spring Boot 2.x 以降
  • Maven または Gradle

Spring Boot アプリケーションのセットアップ

まず、Spring Cloud Stream と Kafka に必要な依存関係を含む単純な Spring Boot プロジェクトをセットアップしましょう。

Maven pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Gradle ビルド.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
    }
}

Kafka バインダーの構成

次に、application.yml ファイルで Kafka バインダーを構成します。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-group
          consumer:
            interceptor-classes: com.example.MyConsumerInterceptor
      kafka:
        binder:
          brokers: localhost:9092

Kafka コンシューマ インターセプタの作成

コンシューマ インターセプタを作成するには、Kafka が提供する ConsumerInterceptor インターフェイスを実装します。このインターフェイスを使用すると、アプリケーションに到達する前にレコードをインターセプトして処理するためのカスタム ロジックを定義できます。

package com.example;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class);

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value());
            // Add your custom logic here
        });
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        // Custom logic on commit
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration logic
    }
}

コンシューマアプリケーションの作成

Kafka トピックからのメッセージをリッスンする単純なコンシューマ アプリケーションを作成します。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Received message: " + message.getPayload());
    }
}

バインディング用インターフェース

入力チャネルを Kafka トピックにバインドするためのインターフェイスを定義します。

package com.example;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaProcessor {
    String INPUT = "input";

    @Input(INPUT)
    SubscribableChannel input();
}

アプリケーションの実行

  1. Kafka ブローカーを起動し、必要なトピック (my-topic) を作成します。
  2. Spring Boot アプリケーションを実行します。

Kafka トピックに対してメッセージが生成されると、MyConsumerInterceptor がレコードをインターセプトし、インターセプトされたログ メッセージが表示されるはずです。

結論

このブログでは、Spring Cloud Stream Kafka Binder でコンシューマ インターセプタを使用する方法を検討してきました。インターセプターは、アプリケーションによってレコードが消費される前に、レコードを処理、ログ記録、および操作するための強力な方法を提供します。カスタム インターセプターを統合することで、Kafka コンシューマーの機能を強化し、ロギング、メトリクス収集、データ変換などの貴重な機能を追加できます。

このガイドで概説されている手順に従うことで、Spring Cloud Stream アプリケーションにコンシューマ インターセプタをシームレスに実装して構成できるようになります。コーディングを楽しんでください!

以上がSpring Cloud Stream Kafka Binder Consumer Interceptor の探索の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。