


How to use @KafkaListener to concurrently receive messages in batches in Spring Boot
The first step, concurrent consumption
Look at the code first. The key point is that we are using ConcurrentKafkaListenerContainerFactory and set factory.setConcurrency(4); (My topic has 4 partitions. In order to To speed up consumption, set the concurrency to 4, that is, there are 4 KafkaMessageListenerContainers)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
Note that you can also add spring.kafka.listener.concurrency=3 directly to application.properties, and then use @KafkaListener for concurrent consumption.
The second step is batch consumption
Then there is batch consumption. The key points are factory.setBatchListener(true);
and propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);One is to enable batch consumption, and the other is to set the maximum number of message records that batch consumption can consume each time.
It is important to note that the ConsumerConfig.MAX_POLL_RECORDS_CONFIG we set is 50, which does not mean that we will keep waiting if 50 messages are not reached. The official explanation is "The maximum number of records returned in a single call to poll()." That is, 50 represents the maximum number of records returned in one poll.
You can see from the startup log that there is max.poll.interval.ms = 300000, which means that we call poll once every max.poll.interval.ms interval. Each poll returns up to 50 records.
max.poll.interval.msThe official explanation is "The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public MapStartup log screenshotconsumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
The above is the detailed content of How to use @KafkaListener to concurrently receive messages in batches in Spring Boot. For more information, please follow other related articles on the PHP Chinese website!

Java'splatformindependencemeansdeveloperscanwritecodeonceandrunitonanydevicewithoutrecompiling.ThisisachievedthroughtheJavaVirtualMachine(JVM),whichtranslatesbytecodeintomachine-specificinstructions,allowinguniversalcompatibilityacrossplatforms.Howev

To set up the JVM, you need to follow the following steps: 1) Download and install the JDK, 2) Set environment variables, 3) Verify the installation, 4) Set the IDE, 5) Test the runner program. Setting up a JVM is not just about making it work, it also involves optimizing memory allocation, garbage collection, performance tuning, and error handling to ensure optimal operation.

ToensureJavaplatformindependence,followthesesteps:1)CompileandrunyourapplicationonmultipleplatformsusingdifferentOSandJVMversions.2)UtilizeCI/CDpipelineslikeJenkinsorGitHubActionsforautomatedcross-platformtesting.3)Usecross-platformtestingframeworkss

Javastandsoutinmoderndevelopmentduetoitsrobustfeatureslikelambdaexpressions,streams,andenhancedconcurrencysupport.1)Lambdaexpressionssimplifyfunctionalprogramming,makingcodemoreconciseandreadable.2)Streamsenableefficientdataprocessingwithoperationsli

The core features of Java include platform independence, object-oriented design and a rich standard library. 1) Object-oriented design makes the code more flexible and maintainable through polymorphic features. 2) The garbage collection mechanism liberates the memory management burden of developers, but it needs to be optimized to avoid performance problems. 3) The standard library provides powerful tools from collections to networks, but data structures should be selected carefully to keep the code concise.

Yes,Javacanruneverywhereduetoits"WriteOnce,RunAnywhere"philosophy.1)Javacodeiscompiledintoplatform-independentbytecode.2)TheJavaVirtualMachine(JVM)interpretsorcompilesthisbytecodeintomachine-specificinstructionsatruntime,allowingthesameJava

JDKincludestoolsfordevelopingandcompilingJavacode,whileJVMrunsthecompiledbytecode.1)JDKcontainsJRE,compiler,andutilities.2)JVMmanagesbytecodeexecutionandsupports"writeonce,runanywhere."3)UseJDKfordevelopmentandJREforrunningapplications.

Key features of Java include: 1) object-oriented design, 2) platform independence, 3) garbage collection mechanism, 4) rich libraries and frameworks, 5) concurrency support, 6) exception handling, 7) continuous evolution. These features of Java make it a powerful tool for developing efficient and maintainable software.


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Dreamweaver Mac version
Visual web development tools

WebStorm Mac version
Useful JavaScript development tools

Dreamweaver CS6
Visual web development tools

SublimeText3 English version
Recommended: Win version, supports code prompts!

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.
