Heim  >  Artikel  >  Java  >  Wie Spring Boot Kafka integriert

Wie Spring Boot Kafka integriert

WBOY
WBOYnach vorne
2023-06-02 14:18:351399Durchsuche

Schritt 1: Abhängigkeiten hinzufügen

Fügen Sie die folgenden Abhängigkeiten in pom.xml hinzu:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

Schritt 2: Konfigurieren Sie Kafka

Fügen Sie die folgende Konfiguration in der Datei application.yml hinzu: application.yml 文件中添加以下配置:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

这里我们配置了 Kafka 的服务地址为 localhost:9092,配置了一个消费者组 ID 为 my-group,并设置了一个最早的偏移量来读取消息。在生产者方面,我们配置了消息序列化程序为 StringSerializer

步骤三:创建一个生产者

我们现在要创建一个 Kafka 生产者,以便向 Kafka 服务器发送消息。我们将在此处创建一个 RESTful API 端点,以接收 POST 请求并将消息发送到 Kafka。

首先,我们将创建一个 KafkaProducerConfig 类,用于配置 Kafka 生产者:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaProducerConfig 类声明为配置类。然后,我们使用 @Value 注解注入配置文件中的 bootstrap-servers 属性。

接下来,我们创建了一个 producerConfigs 方法,用于设置 Kafka 生产者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三个属性。

然后,我们创建了一个 producerFactory 方法,用于创建 Kafka 生产者工厂。在这里,我们使用了 DefaultKafkaProducerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaTemplate 方法,用于创建 KafkaTemplate 实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例。

接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用 @RestController 注解创建一个 RESTful 控制器:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

在上面的代码中,我们使用 @Autowired 注解将 KafkaTemplate 实例注入到 KafkaController 类中。然后,我们创建了一个 sendMessage 方法,用于发送消息到 Kafka。

在这里,我们使用 kafkaTemplate.send 方法发送消息到 my-topic 主题。send 方法返回一个 ListenableFuture 对象,用于异步处理结果。

步骤四:创建一个消费者

现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic 主题读取消息。

首先,我们将创建一个 KafkaConsumerConfig 类,用于配置 Kafka 消费者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上面的代码中,我们使用 @Configuration 注解将 KafkaConsumerConfig 类声明为配置类,并使用 @EnableKafka 注解启用 Kafka。

然后,我们使用 @Value 注解注入配置文件中的 bootstrap-serversconsumer.group-id 属性。

接下来,我们创建了一个 consumerConfigs 方法,用于设置 Kafka 消费者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五个属性。

然后,我们创建了一个 consumerFactory 方法,用于创建 Kafka 消费者工厂。在这里,我们使用了 DefaultKafkaConsumerFactory 类,并传递了我们的配置。

最后,我们创建了一个 kafkaListenerContainerFactory 方法,用于创建一个 ConcurrentKafkaListenerContainerFactory 实例。在这里,我们将消费者工厂注入到 kafkaListenerContainerFactory 实例中。

接下来,我们将创建一个 Kafka 消费者类 KafkaConsumer,用于监听 my-topic 主题并接收消息:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为 my-group-id

现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/sendrrreee

hier Wir konfigurierte Kafkas Dienstadresse als localhost:9092, konfigurierte eine Verbrauchergruppen-ID als my-group und legte einen frühesten Offset zum Lesen von Informationen fest. Auf der Produzentenseite haben wir den Nachrichtenserialisierer als StringSerializer konfiguriert. 🎜🎜Schritt 3: Erstellen Sie einen Produzenten🎜🎜Wir erstellen nun einen Kafka-Produzenten, um Nachrichten an den Kafka-Server zu senden. Hier erstellen wir einen RESTful-API-Endpunkt, um POST-Anfragen zu empfangen und Nachrichten an Kafka zu senden. 🎜🎜Zuerst erstellen wir eine KafkaProducerConfig-Klasse, um den Kafka-Produzenten zu konfigurieren: 🎜rrreee🎜Im obigen Code verwenden wir die @Configuration-Annotation für The KafkaProducerConfig Die Klasse wird als Konfigurationsklasse deklariert. Anschließend fügen wir das Attribut bootstrap-servers mithilfe der Annotation @Value in die Konfigurationsdatei ein. 🎜🎜Als nächstes erstellen wir eine producerConfigs-Methode, um die Konfiguration des Kafka-Produzenten festzulegen. Hier legen wir drei Eigenschaften fest: BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG und VALUE_SERIALIZER_CLASS_CONFIG. 🎜🎜Dann erstellen wir eine producerFactory-Methode zum Erstellen einer Kafka-Produzentenfabrik. Hier haben wir die Klasse DefaultKafkaProducerFactory verwendet und unsere Konfiguration übergeben. 🎜🎜Schließlich haben wir eine kafkaTemplate-Methode zum Erstellen von KafkaTemplate-Instanzen erstellt. Hier verwenden wir die soeben erstellte Producer-Factory als Parameter und geben eine KafkaTemplate-Instanz zurück. 🎜🎜Als nächstes erstellen wir einen RESTful-Endpunkt, der POST-Anfragen empfängt und Nachrichten an Kafka sendet. Hier erstellen wir einen RESTful-Controller mit der Annotation @RestController: 🎜rrreee🎜 Im obigen Code verwenden wir die Annotation @Autowired für KafkaTemplate code>-Instanz wird in die Klasse <code>KafkaController eingefügt. Dann haben wir eine sendMessage-Methode zum Senden von Nachrichten an Kafka erstellt. 🎜🎜Hier verwenden wir die Methode kafkaTemplate.send, um Nachrichten an das Thema my-topic zu senden. Die send-Methode gibt ein ListenableFuture-Objekt für die asynchrone Verarbeitung von Ergebnissen zurück. 🎜🎜Schritt 4: Erstellen Sie einen Verbraucher🎜🎜Jetzt erstellen wir einen Kafka-Verbraucher, um Nachrichten vom Kafka-Server zu empfangen. Hier erstellen wir eine Verbrauchergruppe und konfigurieren sie zum Lesen von Nachrichten aus dem Thema my-topic. 🎜🎜Zuerst erstellen wir eine KafkaConsumerConfig-Klasse, um den Kafka-Konsumenten zu konfigurieren: 🎜rrreee🎜Im obigen Code verwenden wir die Annotation @Configuration, um The KafkaConsumerConfig zu konfigurieren Die Klasse wird als Konfigurationsklasse deklariert und Kafka wird mithilfe der Annotation @EnableKafka aktiviert. 🎜🎜Dann verwenden wir die Annotation @Value, um die Eigenschaften bootstrap-servers und consumer.group-id in die Konfigurationsdatei einzufügen. 🎜🎜Als nächstes erstellen wir eine consumerConfigs-Methode, um die Konfiguration des Kafka-Konsumenten festzulegen. Hier legen wir fünf Eigenschaften fest: BOOTSTRAP_SERVERS_CONFIG, GROUP_ID_CONFIG, AUTO_OFFSET_RESET_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG und VALUE_DESERIALIZER_CLASS_CONFIG. 🎜🎜Dann erstellen wir eine consumerFactory-Methode zum Erstellen einer Kafka-Consumer-Factory. Hier haben wir die Klasse DefaultKafkaConsumerFactory verwendet und unsere Konfiguration übergeben. 🎜🎜Schließlich haben wir eine kafkaListenerContainerFactory-Methode erstellt, um eine ConcurrentKafkaListenerContainerFactory-Instanz zu erstellen. Hier injizieren wir die Consumer Factory in die kafkaListenerContainerFactory-Instanz. 🎜🎜Als nächstes erstellen wir eine Kafka-Verbraucherklasse KafkaConsumer, um das Thema my-topic anzuhören und Nachrichten zu empfangen: 🎜rrreee🎜Im obigen Code verwenden wir das @KafkaListener-Annotation zum Deklarieren einer Verbrauchermethode, die Nachrichten empfängt, die aus dem Thema my-topic gelesen werden. Hier setzen wir die Verbrauchergruppen-ID auf my-group-id. 🎜🎜Jetzt haben wir die Einrichtung des Kafka-Produzenten und -Konsumenten abgeschlossen. Wir können den Befehl mvn spring-boot:run verwenden, um die Anwendung zu starten und den Befehl curl verwenden, um eine POST-Anfrage an http://localhost:8080/send zu senden Endpunkt zum Senden der Nachricht „An Kafka senden“. Anschließend können wir die vom Verbraucher empfangenen Nachrichten auf der Konsole anzeigen. Dies ist die Grundkonfiguration für die Verwendung von Spring Boot und Kafka. Wir können uns je nach Bedarf ändern und erweitern, um spezifische Bedürfnisse zu erfüllen. 🎜

Das obige ist der detaillierte Inhalt vonWie Spring Boot Kafka integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen