Maison >Java >javaDidacticiel >Comment utiliser @KafkaListener pour recevoir simultanément des messages par lots dans Spring Boot
###La première étape, la consommation simultanée###
Regardez d'abord le code. Le point clé est que nous utilisons ConcurrentKafkaListenerContainerFactory et définissons factory.setConcurrency(4); consommation, il sera simultané Réglé sur 4, c'est-à-dire qu'il y a 4 KafkaMessageListenerContainers)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
Notez que vous pouvez également ajouter spring.kafka.listener.concurrency=3 directement à application.properties, puis utiliser @KafkaListener pour une consommation simultanée .
###La deuxième étape est la consommation par lots###
Ensuite, la consommation par lots. Les points clés sont factory.setBatchListener(true);
et propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
L'un consiste à activer la consommation par lots et l'autre à définir le nombre maximum d'enregistrements de messages que la consommation par lots peut consommer chacun. temps.
Il est important de noter que le ConsumerConfig.MAX_POLL_RECORDS_CONFIG que nous définissons est de 50, ce qui ne signifie pas que nous continuerons d'attendre si 50 messages ne sont pas atteints. L'explication officielle est "Le nombre maximum d'enregistrements renvoyés dans un seul appel à poll()." Autrement dit, 50 représente le nombre maximum d'enregistrements renvoyés dans un sondage.
Vous pouvez voir dans le journal de démarrage qu'il y a max.poll.interval.ms = 300000, ce qui signifie que nous appelons poll une fois par intervalle max.poll.interval.ms. Chaque sondage renvoie jusqu'à 50 enregistrements.
L'explication officielle de max.poll.interval.ms est "Le délai maximum entre les invocations de poll() lors de l'utilisation de la gestion des groupes de consommateurs. Cela impose une limite supérieure à la durée pendant laquelle le consommateur peut être inactif avant de récupérer plus d'enregistrements. Si poll() n'est pas appelé avant l'expiration de ce délai, alors le consommateur est considéré comme ayant échoué et le groupe va rééquilibrer afin de réaffecter les partitions à un autre membre ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public MapconsumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
Capture d'écran du journal de démarrage
À propos de max.poll. Capture d'écran de l'explication officielle de .records et max.poll.interval.ms :
###La troisième étape, la consommation de partition###
Pour un sujet avec une seule partition, la consommation de partition n'est pas nécessaire car elle n'a aucun sens. L'exemple suivant concerne le cas où il y a 2 partitions (il y a 4 méthodes ListenPartitionX dans mon code complet, et mon sujet a 4 partitions définies. Les lecteurs peuvent l'ajuster en fonction de leur propre situation).
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!