Home >Java >javaTutorial >How Spring Boot integrates Kafka

How Spring Boot integrates Kafka

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBforward
2023-06-02 14:18:351552browse

Step 1: Add dependencies

Add the following dependencies in pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

Step 2: Configure Kafka

In application.yml Add the following configuration to the file:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

Here we configure Kafka’s service address as localhost:9092, and configure a consumer group ID as my-group, and sets an earliest offset to read the message. On the producer side, we configured the message serializer as StringSerializer.

Step 3: Create a producer

We now need to create a Kafka producer to send messages to the Kafka server. Here we will create a RESTful API endpoint to receive POST requests and send messages to Kafka.

First, we will create a KafkaProducerConfig class for configuring the Kafka producer:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

In the above code, we use @Configuration Annotation declares the KafkaProducerConfig class as a configuration class. We then inject the bootstrap-servers property in the configuration file using the @Value annotation.

Next, we created a producerConfigs method to set the configuration of the Kafka producer. Here we set the BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG properties.

Then, we created a producerFactory method for creating a Kafka producer factory. Here we have used the DefaultKafkaProducerFactory class and passed our configuration.

Finally, we created a kafkaTemplate method for creating KafkaTemplate instances. Here, we use the producer factory we just created as a parameter and return a KafkaTemplate instance.

Next, we will create a RESTful endpoint that receives POST requests and sends messages to Kafka. Here, we will create a RESTful controller using the @RestController annotation:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

In the above code, we will use the @Autowired annotation to KafkaTemplate The instance is injected into the KafkaController class. Then, we created a sendMessage method for sending messages to Kafka.

Here, we use the kafkaTemplate.send method to send messages to the my-topic topic. The send method returns a ListenableFuture object for asynchronous processing of results.

Step 4: Create a consumer

Now, we will create a Kafka consumer to receive messages from the Kafka server. Here we will create a consumer group and configure it to read messages from the my-topic topic.

First, we will create a KafkaConsumerConfig class for configuring the Kafka consumer:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

In the above code, we use @Configuration Annotation declares the KafkaConsumerConfig class as a configuration class and enables Kafka using the @EnableKafka annotation.

Then, we use the @Value annotation to inject the bootstrap-servers and consumer.group-id properties in the configuration file.

Next, we created a consumerConfigs method to set the configuration of the Kafka consumer. Here, we set five properties BOOTSTRAP_SERVERS_CONFIG, GROUP_ID_CONFIG, AUTO_OFFSET_RESET_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG.

Then, we created a consumerFactory method for creating a Kafka consumer factory. Here we have used the DefaultKafkaConsumerFactory class and passed our configuration.

Finally, we created a kafkaListenerContainerFactory method to create a ConcurrentKafkaListenerContainerFactory instance. Here, we inject the consumer factory into the kafkaListenerContainerFactory instance.

Next, we will create a Kafka consumer class KafkaConsumer to listen to the my-topic topic and receive messages:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

in In the above code, we use the @KafkaListener annotation to declare a consumer method to receive messages read from the my-topic topic. Here we set the consumer group ID to my-group-id.

Now, we have completed the setup of Kafka producer and consumer. We can start the application using the mvn spring-boot:run command and use the curl command to send a POST request to the http://localhost:8080/send endpoint to send the message to Kafka. We can then view the messages received by the consumer on the console. That’s the basic setup for using Spring Boot and Kafka. We can change and expand as needed to meet specific needs.

The above is the detailed content of How Spring Boot integrates Kafka. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete