Home  >  Article  >  Java  >  How to use @KafkaListener to concurrently receive messages in batches in Spring Boot

How to use @KafkaListener to concurrently receive messages in batches in Spring Boot

WBOY
WBOYforward
2023-05-13 14:01:061650browse


The first step, concurrent consumption

Look at the code first. The key point is that we are using ConcurrentKafkaListenerContainerFactory and set factory.setConcurrency(4); (My topic has 4 partitions. In order to To speed up consumption, set the concurrency to 4, that is, there are 4 KafkaMessageListenerContainers)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

Note that you can also add spring.kafka.listener.concurrency=3 directly to application.properties, and then use @KafkaListener for concurrent consumption.


The second step is batch consumption

Then there is batch consumption. The key points are factory.setBatchListener(true);

and propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);

One is to enable batch consumption, and the other is to set the maximum number of message records that batch consumption can consume each time.

It is important to note that the ConsumerConfig.MAX_POLL_RECORDS_CONFIG we set is 50, which does not mean that we will keep waiting if 50 messages are not reached. The official explanation is "The maximum number of records returned in a single call to poll()." That is, 50 represents the maximum number of records returned in one poll.

You can see from the startup log that there is max.poll.interval.ms = 300000, which means that we call poll once every max.poll.interval.ms interval. Each poll returns up to 50 records.

max.poll.interval.msThe official explanation is "The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";How to use @KafkaListener to concurrently receive messages in batches in Spring Boot

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

Startup log screenshot

How to use @KafkaListener to concurrently receive messages in batches in Spring Boot

Screenshot of official explanation about max.poll.records and max.poll.interval.ms:


######## The third step, partition consumption######For a topic with only one partition, partition consumption is not necessary because it is meaningless. The following example is for the case where there are 2 partitions (there are 4 listenPartitionX methods in my complete code, and my topic has 4 partitions set). Readers can adjust it according to their own situation. ###
public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

The above is the detailed content of How to use @KafkaListener to concurrently receive messages in batches in Spring Boot. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete