


How SpringBoot integrates RabbitMq custom message listening container to implement message batch processing
SpringBoot integrates RabbitMq custom message listening container to implement message batch processing
Preface
RabbitMQ is a commonly used message queue. Spring Boot has deeply integrated it and can quickly to realize the sending and receiving of messages. In RabbitMQ, the sending and receiving of messages are asynchronous, so a listener is needed to listen for the arrival of messages. Spring Boot provides a default listener container, but sometimes we need to customize the listener container to meet some special needs, such as obtaining data in batches.
In this article, we will use Spring Boot to integrate RabbitMQ and customize a listener container to achieve the function of obtaining data in batches.
Prerequisites:
Before you start, you need to have the following conditions:
The RabbitMQ server has been installed and started.
The queue to be used has been created.
Already familiar with the basic knowledge of Spring Boot and RabbitMQ.
Environment preparation:
Before starting, we need to prepare the following environment:
JDK 1.8 or Above version
Spring Boot 2.5.0 or above version
RabbitMQ 3.8.0 or above version
Add dependencies
First, add the following dependencies in the pom.xml file:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Configuration file
Next, add the following configuration in the application.properties file :
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ # 队列名称 spring.rabbitmq.listener.simple.queue-name=myQueue # 最大并发消费者数量 spring.rabbitmq.listener.simple.concurrency=5 # 最小数量 spring.rabbitmq.listener.simple.min-concurrency=1 # 最大数量 spring.rabbitmq.listener.simple.max-concurrency=10 # 批量处理消息的大小 spring.rabbitmq.listener.simple.batch-size=50
or
spring: rabbitmq: host: localhost listener: simple: batch-size: 50 concurrency: 5 max-concurrency: 10 min-concurrency: 1 queue-name: myQueue password: guest port: 5672 username: guest virtual-host: /
Writing a listener
Then, we need to create a listener class in order to process the messages received from the queue. The following is a simple example:
@Component public class MyListener { @RabbitListener(queues = "myQueue", containerFactory = "myFactory") public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { try { // 处理消息 System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { // 处理消息 System.out.println("Received message: " + new String(message.getBody())); } channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true); } finally { // 手动确认消息 channel.basicAck(deliveryTag, true); } } }
In the above code, we use the @RabbitListener annotation to specify the name of the queue to listen to, and also specify the use of the myFactory factory to create the listening container. In this listener, we simply print the received message.
Create SimpleRabbitListenerContainerFactory
Next, we need to create a SimpleRabbitListenerContainerFactory factory to be able to customize the behavior of the listening container. Here is a simple example:
@Configuration public class RabbitMQConfig { // @Bean // public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConnectionFactory(connectionFactory); // factory.setConcurrentConsumers(1); // factory.setMaxConcurrentConsumers(10); // factory.setBatchListener(true); // factory.setBatchSize(50); // return factory; // } @Bean public SimpleRabbitListenerContainerFactory myFactory( ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 并发消费者数,默认为 1 factory.setConcurrentConsumers(5); // 最大并发消费者数,默认为 1 factory.setMaxConcurrentConsumers(10); // 拒绝未确认的消息并重新将它们放回队列,默认为 true factory.setDefaultRequeueRejected(false); // 容器启动时是否自动启动,默认为 true factory.setAutoStartup(true); // 消息确认模式,默认为 AUTO factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 每个消费者在一次请求中预获取的消息数,默认为 1 factory.setPrefetchCount(5); // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制 factory.setReceiveTimeout(1000); // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务 factory.setTransactionManager(transactionManager); // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息 factory.setMessageConverter(messageConverter); // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor factory.setTaskExecutor(new SimpleAsyncTaskExecutor()); // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒 factory.setShutdownTimeout(10000); // 重试失败的消息之前等待的时间,默认为 5000 毫秒 factory.setRecoveryInterval(5000); // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true factory.setMissingQueuesFatal(false); // 监听器容器连接工厂 factory.setConnectionFactory(connectionFactory); return factory; } }
Most of these properties are optional and can be set as needed. Depending on the application's needs, we are free to tune these properties to improve application performance and reliability.
Send Message
Finally, we can write a simple message sending code to send some messages to the queue. Here's a simple example:
@Component public class MySender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { for (int i = 0; i < 100; i++) { rabbitTemplate.convertAndSend("myQueue", "message:" + i); } } }
The above is the detailed content of How SpringBoot integrates RabbitMq custom message listening container to implement message batch processing. For more information, please follow other related articles on the PHP Chinese website!

The article discusses using Maven and Gradle for Java project management, build automation, and dependency resolution, comparing their approaches and optimization strategies.

The article discusses creating and using custom Java libraries (JAR files) with proper versioning and dependency management, using tools like Maven and Gradle.

The article discusses implementing multi-level caching in Java using Caffeine and Guava Cache to enhance application performance. It covers setup, integration, and performance benefits, along with configuration and eviction policy management best pra

The article discusses using JPA for object-relational mapping with advanced features like caching and lazy loading. It covers setup, entity mapping, and best practices for optimizing performance while highlighting potential pitfalls.[159 characters]

Java's classloading involves loading, linking, and initializing classes using a hierarchical system with Bootstrap, Extension, and Application classloaders. The parent delegation model ensures core classes are loaded first, affecting custom class loa


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

WebStorm Mac version
Useful JavaScript development tools

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

Dreamweaver Mac version
Visual web development tools

Zend Studio 13.0.1
Powerful PHP integrated development environment

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.