首頁  >  文章  >  Java  >  Spring Boot中怎麼使用@KafkaListener並發大量接收訊息

Spring Boot中怎麼使用@KafkaListener並發大量接收訊息

WBOY
WBOY轉載
2023-05-13 14:01:061609瀏覽


第一步,並發消費

先看程式碼,重點是這我們使用的是ConcurrentKafkaListenerContainerFactory並且設定了factory.setConcurrency(4); (我的topic有4個分區,為了加快消費將並發設為4,也就是有4個KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中加入spring.kafka.listener.concurrency=3,然後使用@KafkaListener並發消費。


第二步,大量消費

然後是大量消費。重點是factory.setBatchListener(true);

以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);

一個設啟用批量消費,一個設定批量消費每次最多消費多少個訊息記錄。

重點說明一下,我們設定的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,並不是說如果沒有達到50個訊息,我們就一直在等待。官方的解釋是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多回傳的記錄數。

從啟動日誌可以看到還有個 max.poll.interval.ms = 300000,也就說每間隔max.poll.interval.ms我們就呼叫一次poll。每次poll最多回傳50筆記錄。

max.poll.interval.ms官方解釋是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";Spring Boot中怎麼使用@KafkaListener並發大量接收訊息

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

##關於max.poll.records和max.poll.interval.ms官方解釋截圖:Spring Boot中怎麼使用@KafkaListener並發大量接收訊息


###第三步,分區消費######對於只有一個分區的topic,不需要分區消費,因為沒有意義。下面的例子是針對有2個分區的情況(我的完整程式碼中有4個listenPartitionX方法,我的topic設定了4個分區),讀者可以依照自己的情況進行調整。 ###
public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

以上是Spring Boot中怎麼使用@KafkaListener並發大量接收訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除