Home >Java >javaTutorial >How to use @KafkaListener to concurrently receive messages in batches in Spring Boot
The first step, concurrent consumption
Look at the code first. The key point is that we are using ConcurrentKafkaListenerContainerFactory and set factory.setConcurrency(4); (My topic has 4 partitions. In order to To speed up consumption, set the concurrency to 4, that is, there are 4 KafkaMessageListenerContainers)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
Note that you can also add spring.kafka.listener.concurrency=3 directly to application.properties, and then use @KafkaListener for concurrent consumption.
The second step is batch consumption
Then there is batch consumption. The key points are factory.setBatchListener(true);
and propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);One is to enable batch consumption, and the other is to set the maximum number of message records that batch consumption can consume each time.
It is important to note that the ConsumerConfig.MAX_POLL_RECORDS_CONFIG we set is 50, which does not mean that we will keep waiting if 50 messages are not reached. The official explanation is "The maximum number of records returned in a single call to poll()." That is, 50 represents the maximum number of records returned in one poll.
You can see from the startup log that there is max.poll.interval.ms = 300000, which means that we call poll once every max.poll.interval.ms interval. Each poll returns up to 50 records.
max.poll.interval.msThe official explanation is "The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public MapStartup log screenshot Screenshot of official explanation about max.poll.records and max.poll.interval.ms:consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
The above is the detailed content of How to use @KafkaListener to concurrently receive messages in batches in Spring Boot. For more information, please follow other related articles on the PHP Chinese website!