>  기사  >  웹 프론트엔드  >  Spring Cloud Stream Kafka 바인더 소비자 인터셉터 탐색

Spring Cloud Stream Kafka 바인더 소비자 인터셉터 탐색

WBOY
WBOY원래의
2024-08-06 19:20:501049검색

Exploring Spring Cloud Stream Kafka Binder Consumer Interceptor

소개

Spring Cloud Stream은 Apache Kafka 및 RabbitMQ와 같은 메시지 브로커를 추상화하여 메시지 기반 마이크로서비스 개발을 단순화하는 프레임워크입니다. Spring Cloud Stream의 강력한 기능 중 하나는 Kafka와 원활하게 통합하여 개발자가 강력하고 확장 가능한 이벤트 기반 애플리케이션을 구축할 수 있도록 하는 기능입니다. Spring Cloud Stream의 Kafka 바인더는 Kafka 주제에 쉽게 연결하는 방법을 제공합니다.

이번 블로그에서는 Spring Cloud Stream Kafka Binder와 함께 소비자 인터셉터를 사용하는 방법을 살펴보겠습니다. Kafka의 인터셉터는 레코드가 애플리케이션에서 사용되기 전에 가로채서 변경하는 메커니즘을 제공하여 로깅, 메트릭 수집 및 데이터 조작 기회를 제공합니다.

전제조건

자세한 내용을 살펴보기 전에 다음 전제 조건을 충족하는지 확인하세요.

  • JDK(Java Development Kit) 8 이상
  • 아파치 카프카
  • Spring Boot 2.x 이상
  • Maven 또는 Gradle

스프링 부트 애플리케이션 설정

먼저 Spring Cloud Stream 및 Kafka에 필요한 종속성을 갖춘 간단한 Spring Boot 프로젝트를 설정해 보겠습니다.

메이븐 pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Gradle 빌드.gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR10"
    }
}

Kafka 바인더 구성

다음으로 application.yml 파일에서 Kafka 바인더를 구성합니다.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-group
          consumer:
            interceptor-classes: com.example.MyConsumerInterceptor
      kafka:
        binder:
          brokers: localhost:9092

Kafka 소비자 인터셉터 만들기

소비자 인터셉터를 생성하려면 Kafka에서 제공하는 ConsumerInterceptor 인터페이스를 구현하세요. 이 인터페이스를 사용하면 기록이 애플리케이션에 도달하기 전에 가로채고 처리하기 위한 사용자 정의 논리를 정의할 수 있습니다.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String>, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MyConsumerInterceptor.class);

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        records.forEach(record -> {
            logger.info("Intercepted record: key = {}, value = {}", record.key(), record.value());
            // Add your custom logic here
        });
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        // Custom logic on commit
    }

    @Override
    public void close() {
        // Cleanup resources if necessary
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Configuration logic
    }
}

소비자 애플리케이션 생성

Kafka 주제의 메시지를 수신하는 간단한 소비자 애플리케이션을 만듭니다.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Received message: " + message.getPayload());
    }
}

바인딩을 위한 인터페이스

입력 채널을 Kafka 주제에 바인딩하기 위한 인터페이스를 정의합니다.

package com.example;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaProcessor {
    String INPUT = "input";

    @Input(INPUT)
    SubscribableChannel input();
}

애플리케이션 실행

  1. Kafka 브로커를 시작하고 필요한 주제(my-topic)를 생성합니다.
  2. Spring Boot 애플리케이션을 실행합니다.

Kafka 주제에 대한 메시지가 생성되면 MyConsumerInterceptor가 레코드를 가로채고 가로채는 로그 메시지가 표시됩니다.

결론

이 블로그에서는 Spring Cloud Stream Kafka Binder와 함께 소비자 인터셉터를 사용하는 방법을 살펴보았습니다. 인터셉터는 애플리케이션에서 레코드를 사용하기 전에 레코드를 처리, 기록 및 조작하는 강력한 방법을 제공합니다. 사용자 정의 인터셉터를 통합하면 로깅, 메트릭 수집, 데이터 변환과 같은 중요한 기능을 추가하여 Kafka 소비자의 기능을 향상할 수 있습니다.

이 가이드에 설명된 단계를 따르면 Spring Cloud Stream 애플리케이션에서 소비자 인터셉터를 원활하게 구현하고 구성할 수 있습니다. 즐거운 코딩하세요!

위 내용은 Spring Cloud Stream Kafka 바인더 소비자 인터셉터 탐색의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.