Home >Java >javaTutorial >How Spring Boot integrates Kafka
Add the following dependencies in pom.xml:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
In application.yml
Add the following configuration to the file:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
Here we configure Kafka’s service address as localhost:9092
, and configure a consumer group ID as my-group
, and sets an earliest offset to read the message. On the producer side, we configured the message serializer as StringSerializer
.
We now need to create a Kafka producer to send messages to the Kafka server. Here we will create a RESTful API endpoint to receive POST requests and send messages to Kafka.
First, we will create a KafkaProducerConfig
class for configuring the Kafka producer:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
In the above code, we use @Configuration
Annotation declares the KafkaProducerConfig
class as a configuration class. We then inject the bootstrap-servers
property in the configuration file using the @Value
annotation.
Next, we created a producerConfigs
method to set the configuration of the Kafka producer. Here we set the BOOTSTRAP_SERVERS_CONFIG
, KEY_SERIALIZER_CLASS_CONFIG
and VALUE_SERIALIZER_CLASS_CONFIG
properties.
Then, we created a producerFactory
method for creating a Kafka producer factory. Here we have used the DefaultKafkaProducerFactory
class and passed our configuration.
Finally, we created a kafkaTemplate
method for creating KafkaTemplate
instances. Here, we use the producer factory we just created as a parameter and return a KafkaTemplate
instance.
Next, we will create a RESTful endpoint that receives POST requests and sends messages to Kafka. Here, we will create a RESTful controller using the @RestController
annotation:
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
In the above code, we will use the @Autowired
annotation to KafkaTemplate
The instance is injected into the KafkaController
class. Then, we created a sendMessage
method for sending messages to Kafka.
Here, we use the kafkaTemplate.send
method to send messages to the my-topic
topic. The send method returns a ListenableFuture
object for asynchronous processing of results.
Now, we will create a Kafka consumer to receive messages from the Kafka server. Here we will create a consumer group and configure it to read messages from the my-topic
topic.
First, we will create a KafkaConsumerConfig
class for configuring the Kafka consumer:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
In the above code, we use @Configuration
Annotation declares the KafkaConsumerConfig
class as a configuration class and enables Kafka using the @EnableKafka
annotation.
Then, we use the @Value
annotation to inject the bootstrap-servers
and consumer.group-id
properties in the configuration file.
Next, we created a consumerConfigs
method to set the configuration of the Kafka consumer. Here, we set five properties BOOTSTRAP_SERVERS_CONFIG, GROUP_ID_CONFIG
, AUTO_OFFSET_RESET_CONFIG
, KEY_DESERIALIZER_CLASS_CONFIG
and VALUE_DESERIALIZER_CLASS_CONFIG
.
Then, we created a consumerFactory
method for creating a Kafka consumer factory. Here we have used the DefaultKafkaConsumerFactory
class and passed our configuration.
Finally, we created a kafkaListenerContainerFactory
method to create a ConcurrentKafkaListenerContainerFactory
instance. Here, we inject the consumer factory into the kafkaListenerContainerFactory
instance.
Next, we will create a Kafka consumer class KafkaConsumer
to listen to the my-topic
topic and receive messages:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
in In the above code, we use the @KafkaListener
annotation to declare a consumer method to receive messages read from the my-topic
topic. Here we set the consumer group ID to my-group-id
.
Now, we have completed the setup of Kafka producer and consumer. We can start the application using the mvn spring-boot:run
command and use the curl command to send a POST request to the http://localhost:8080/send
endpoint to send the message to Kafka. We can then view the messages received by the consumer on the console. That’s the basic setup for using Spring Boot and Kafka. We can change and expand as needed to meet specific needs.
The above is the detailed content of How Spring Boot integrates Kafka. For more information, please follow other related articles on the PHP Chinese website!