>  기사  >  Java  >  Spring Boot가 Kafka를 통합하는 방법

Spring Boot가 Kafka를 통합하는 방법

WBOY
WBOY앞으로
2023-06-02 14:18:351466검색

1단계: 종속성 추가

pom.xml에 다음 종속성을 추가합니다.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

2단계: Kafka 구성

application.yml 파일에 다음 구성을 추가합니다. application.yml 文件中添加以下配置:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

这里我们配置了 Kafka 的服务地址为 localhost:9092,配置了一个消费者组 ID 为 my-group,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为 StringSerializer

步骤三:创建一个生产者

我们现在要创建一个 Kafka 生产者,以便向 Kafka 服务器发送消息。我们将在此处创建一个 RESTful API 端点,以接收 POST 请求并将消息发送到 Kafka。

首先,我们将创建一个 KafkaProducerConfig 类,用于配置 Kafka 生产者:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaProducerConfig 类声明为配置类。然后,我们使用 @Value 注解注入配置文件中的 bootstrap-servers 属性。

接下来,我们创建了一个 producerConfigs 方法,用于设置 Kafka 生产者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三个属性。

然后,我们创建了一个 producerFactory 方法,用于创建 Kafka 生产者工厂。在这里,我们使用了 DefaultKafkaProducerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaTemplate 方法,用于创建 KafkaTemplate 实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例。

接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用 @RestController 注解创建一个 RESTful 控制器:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

在上面的代码中,我们使用 @Autowired 注解将 KafkaTemplate 实例注入到 KafkaController 类中。然后,我们创建了一个 sendMessage 方法,用于发送消息到 Kafka。

在这里,我们使用 kafkaTemplate.send 方法发送消息到 my-topic 主题。send 方法返回一个 ListenableFuture 对象,用于异步处理结果。

步骤四:创建一个消费者

现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic 主题读取消息。

首先,我们将创建一个 KafkaConsumerConfig 类,用于配置 Kafka 消费者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaConsumerConfig 类声明为配置类,并使用 @EnableKafka 注解启用 Kafka。

然后,我们使用 @Value 注解注入配置文件中的 bootstrap-serversconsumer.group-id 属性。

接下来,我们创建了一个 consumerConfigs 方法,用于设置 Kafka 消费者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五个属性。

然后,我们创建了一个 consumerFactory 方法,用于创建 Kafka 消费者工厂。在这里,我们使用了 DefaultKafkaConsumerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaListenerContainerFactory 方法,用于创建一个 ConcurrentKafkaListenerContainerFactory 实例。在这里,我们将消费者工厂注入到 kafkaListenerContainerFactory 实例中。

接下来,我们将创建一个 Kafka 消费者类 KafkaConsumer,用于监听 my-topic 主题并接收消息:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为 my-group-id

现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/sendrrreee

여기서 Kafka의 서비스 주소를 localhost:9092로 구성하고, 소비자 그룹 ID를 my-group으로 구성하고, 정보를 읽을 수 있는 가장 빠른 오프셋을 설정했습니다. 생산자 측에서는 메시지 직렬 변환기를 StringSerializer로 구성했습니다. 🎜🎜3단계: 생산자 만들기🎜🎜이제 Kafka 서버에 메시지를 보내는 Kafka 생산자를 만들겠습니다. 여기서는 POST 요청을 수신하고 Kafka에 메시지를 보내는 RESTful API 엔드포인트를 생성하겠습니다. 🎜🎜먼저 Kafka 생산자를 구성하기 위해 KafkaProducerConfig 클래스를 생성합니다. 🎜rrreee🎜위 코드에서는 @Configuration 주석을 에 사용합니다. 클래스는 구성 클래스로 선언됩니다. 그런 다음 @Value 주석을 사용하여 구성 파일에 bootstrap-servers 속성을 ​​삽입합니다. 🎜🎜다음으로 Kafka 생산자의 구성을 설정하는 producerConfigs 메서드를 만듭니다. 여기서는 BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG의 세 가지 속성을 설정했습니다. 🎜🎜그런 다음 Kafka 생산자 팩토리를 생성하기 위한 producerFactory 메서드를 생성합니다. 여기서는 DefaultKafkaProducerFactory 클래스를 사용하고 구성을 전달했습니다. 🎜🎜마지막으로 KafkaTemplate 인스턴스를 생성하기 위한 kafkaTemplate 메서드를 만들었습니다. 여기서는 방금 생성한 생산자 팩토리를 매개변수로 사용하고 KafkaTemplate 인스턴스를 반환합니다. 🎜🎜다음으로 POST 요청을 수신하고 Kafka에 메시지를 보내는 RESTful 엔드포인트를 생성하겠습니다. 여기서는 @RestController 주석을 사용하여 RESTful 컨트롤러를 생성합니다. 🎜rrreee🎜위 코드에서는 @Autowired 주석을 KafkaTemplate에 사용합니다. code> 인스턴스가 <code>KafkaController 클래스에 삽입됩니다. 그런 다음 Kafka에 메시지를 보내기 위한 sendMessage 메서드를 만들었습니다. 🎜🎜여기에서는 kafkaTemplate.send 메서드를 사용하여 my-topic 주제에 메시지를 보냅니다. send 메소드는 결과의 비동기 처리를 위해 ListenableFuture 객체를 반환합니다. 🎜🎜4단계: 소비자 생성🎜🎜이제 Kafka 서버에서 메시지를 수신하기 위한 Kafka 소비자를 생성하겠습니다. 여기서는 소비자 그룹을 생성하고 my-topic 주제의 메시지를 읽도록 구성하겠습니다. 🎜🎜먼저 Kafka 소비자를 구성하기 위해 KafkaConsumerConfig 클래스를 생성합니다. 🎜rrreee🎜위 코드에서는 @Configuration 주석을 에 사용합니다. 클래스는 구성 클래스로 선언되고 @EnableKafka 주석을 사용하여 Kafka가 활성화됩니다. 🎜🎜그런 다음 @Value 주석을 사용하여 구성 파일에 bootstrap-serversconsumer.group-id 속성을 ​​삽입합니다. 🎜🎜다음으로 Kafka 소비자의 구성을 설정하기 위해 consumerConfigs 메서드를 만듭니다. 여기서는 BOOTSTRAP_SERVERS_CONFIG, GROUP_ID_CONFIG, AUTO_OFFSET_RESET_CONFIG, KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG의 다섯 가지 속성을 설정했습니다. 🎜🎜그런 다음 Kafka 소비자 팩토리를 생성하기 위한 consumerFactory 메서드를 생성합니다. 여기서는 DefaultKafkaConsumerFactory 클래스를 사용하고 구성을 전달했습니다. 🎜🎜마지막으로 ConcurrentKafkaListenerContainerFactory 인스턴스를 생성하기 위해 kafkaListenerContainerFactory 메서드를 생성했습니다. 여기서는 소비자 팩토리를 kafkaListenerContainerFactory 인스턴스에 삽입합니다. 🎜🎜다음으로 my-topic 주제를 수신하고 메시지를 수신하기 위해 Kafka 소비자 클래스 KafkaConsumer를 생성합니다. 🎜rrreee🎜위 코드에서 @KafkaListener 주석은 my-topic 주제에서 읽은 메시지를 수신하는 소비자 메서드를 선언합니다. 여기서는 소비자 그룹 ID를 my-group-id로 설정했습니다. 🎜🎜이제 Kafka 생산자와 소비자 설정이 완료되었습니다. mvn spring-boot:run 명령을 사용하여 애플리케이션을 시작하고 컬 명령을 사용하여 http://localhost:8080/send에 POST 요청을 보낼 수 있습니다. Send to Kafka 메시지를 보내기 위한 엔드포인트입니다. 그런 다음 콘솔에서 소비자가 받은 메시지를 볼 수 있습니다. Spring Boot와 Kafka를 사용하기 위한 기본 설정입니다. 특정 요구 사항을 충족하기 위해 필요에 따라 변경하고 확장할 수 있습니다. 🎜

위 내용은 Spring Boot가 Kafka를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제