ホームページ >Java >&#&チュートリアル >@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法

@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法

WBOY
WBOY転載
2023-05-13 14:01:061692ブラウズ


最初のステップ、同時消費

最初にコードを見てください。重要な点は、ConcurrentKafkaListenerContainerFactory を使用しており、set Factory.setConcurrency(4); であるということです (私のトピックには 4 つのパーティションがあります。消費を高速化するには、同時実行性を 4 に設定します。つまり、KafkaMessageListenerContainer が 4 つあります)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

spring.kafka.listener.concurrency=3 を application.properties に直接追加することもできることに注意してください。次に、同時使用のために @KafkaListener を使用します。


2 番目のステップはバッチ消費です。

次に、バッチ消費があります。重要なポイントは、factory.setBatchListener(true);

と propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);

1 つはバッチ消費を有効にすること、もう 1 つはバッチ処理するメッセージ レコードの最大数を設定することです。消費は毎回消費できます。

設定した ConsumerConfig.MAX_POLL_RECORDS_CONFIG が 50 であることに注意することが重要です。これは、50 メッセージに達しない場合に待機し続けることを意味するわけではありません。公式の説明は「poll() への 1 回の呼び出しで返されるレコードの最大数」です。つまり、50 は 1 回のポーリングで返されるレコードの最大数を表します。

起動ログから、max.poll.interval.ms = 300000 であることがわかります。これは、max.poll.interval.ms 間隔ごとにポーリングを 1 回呼び出すことを意味します。各ポーリングでは最大 50 件のレコードが返されます。

max.poll.interval.ms公式の説明は、「コンシューマ グループ管理を使用する場合の、poll() の呼び出し間の最大遅延です。これにより、コンシューマがフェッチする前にアイドル状態でいられる時間に上限が設定されます」レコードがさらに増えます。このタイムアウトが経過する前にpoll()が呼び出されない場合、コンシューマは失敗したとみなされ、グループはパーティションを別のメンバーに再割り当てするために再バランスされます。 ";@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map consumerConfigs() {
        Map propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

起動ログのスクリーンショット

@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法# max.poll.records と max.poll.interval.ms に関する公式説明のスクリーンショット:


#### 3 番目のステップ、パーティションの消費######パーティションが 1 つしかないトピックの場合、パーティションの消費は無意味であるため、必要ありません。次の例は、パーティションが 2 つある場合です (コード全体には listenPartitionX メソッドが 4 つあり、トピックには 4 つのパーティションが設定されています)。読者は自分の状況に応じて調整できます。 ###rree

以上が@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。