ホームページ >Java >&#&チュートリアル >@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法
最初のステップ、同時消費
最初にコードを見てください。重要な点は、ConcurrentKafkaListenerContainerFactory を使用しており、set Factory.setConcurrency(4); であるということです (私のトピックには 4 つのパーティションがあります。消費を高速化するには、同時実行性を 4 に設定します。つまり、KafkaMessageListenerContainer が 4 つあります)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
spring.kafka.listener.concurrency=3 を application.properties に直接追加することもできることに注意してください。次に、同時使用のために @KafkaListener を使用します。
2 番目のステップはバッチ消費です。
次に、バッチ消費があります。重要なポイントは、factory.setBatchListener(true);
と propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);1 つはバッチ消費を有効にすること、もう 1 つはバッチ処理するメッセージ レコードの最大数を設定することです。消費は毎回消費できます。
設定した ConsumerConfig.MAX_POLL_RECORDS_CONFIG が 50 であることに注意することが重要です。これは、50 メッセージに達しない場合に待機し続けることを意味するわけではありません。公式の説明は「poll() への 1 回の呼び出しで返されるレコードの最大数」です。つまり、50 は 1 回のポーリングで返されるレコードの最大数を表します。
起動ログから、max.poll.interval.ms = 300000 であることがわかります。これは、max.poll.interval.ms 間隔ごとにポーリングを 1 回呼び出すことを意味します。各ポーリングでは最大 50 件のレコードが返されます。
max.poll.interval.ms公式の説明は、「コンシューマ グループ管理を使用する場合の、poll() の呼び出し間の最大遅延です。これにより、コンシューマがフェッチする前にアイドル状態でいられる時間に上限が設定されます」レコードがさらに増えます。このタイムアウトが経過する前にpoll()が呼び出されない場合、コンシューマは失敗したとみなされ、グループはパーティションを別のメンバーに再割り当てするために再バランスされます。 ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public Map起動ログのスクリーンショットconsumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
# max.poll.records と max.poll.interval.ms に関する公式説明のスクリーンショット:
以上が@KafkaListener を使用して Spring Boot でメッセージをバッチで同時に受信する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。