SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
前言
RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一些特殊的需求,比如批量获取数据。
在本文中,我们将使用Spring Boot来整合RabbitMQ,并自定义一个监听器容器,实现批量获取数据的功能。
前置条件:
在开始之前,您需要具备以下条件:
已经安装好RabbitMQ服务器并启动。
已经创建好要使用的队列。
已经熟悉了Spring Boot和RabbitMQ的基本知识。
环境准备:
在开始之前,我们需要准备好以下环境:
JDK 1.8或以上版本
Spring Boot 2.5.0或以上版本
RabbitMQ 3.8.0或以上版本
添加依赖
首先,在pom.xml文件中添加以下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件
接下来,在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ # 队列名称 spring.rabbitmq.listener.simple.queue-name=myQueue # 最大并发消费者数量 spring.rabbitmq.listener.simple.concurrency=5 # 最小数量 spring.rabbitmq.listener.simple.min-concurrency=1 # 最大数量 spring.rabbitmq.listener.simple.max-concurrency=10 # 批量处理消息的大小 spring.rabbitmq.listener.simple.batch-size=50
或
spring: rabbitmq: host: localhost listener: simple: batch-size: 50 concurrency: 5 max-concurrency: 10 min-concurrency: 1 queue-name: myQueue password: guest port: 5672 username: guest virtual-host: /
编写监听器
然后,我们需要创建一个监听器类,以便处理从队列中接收到的消息。以下是一个简单的示例:
@Component public class MyListener { @RabbitListener(queues = "myQueue", containerFactory = "myFactory") public void handleMessage(List<MyMessage> messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { try { // 处理消息 System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { // 处理消息 System.out.println("Received message: " + new String(message.getBody())); } channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true); } finally { // 手动确认消息 channel.basicAck(deliveryTag, true); } } }
在上面的代码中,我们使用了@RabbitListener注解来指定要监听的队列名称,同时也指定了使用myFactory工厂来创建监听容器。在这个监听器中,我们简单地打印了接收到的消息。
创建SimpleRabbitListenerContainerFactory
接下来,我们需要创建一个SimpleRabbitListenerContainerFactory工厂,以便能够自定义监听容器的行为。以下是一个简单的示例:
@Configuration public class RabbitMQConfig { // @Bean // public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConnectionFactory(connectionFactory); // factory.setConcurrentConsumers(1); // factory.setMaxConcurrentConsumers(10); // factory.setBatchListener(true); // factory.setBatchSize(50); // return factory; // } @Bean public SimpleRabbitListenerContainerFactory myFactory( ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 并发消费者数,默认为 1 factory.setConcurrentConsumers(5); // 最大并发消费者数,默认为 1 factory.setMaxConcurrentConsumers(10); // 拒绝未确认的消息并重新将它们放回队列,默认为 true factory.setDefaultRequeueRejected(false); // 容器启动时是否自动启动,默认为 true factory.setAutoStartup(true); // 消息确认模式,默认为 AUTO factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 每个消费者在一次请求中预获取的消息数,默认为 1 factory.setPrefetchCount(5); // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制 factory.setReceiveTimeout(1000); // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务 factory.setTransactionManager(transactionManager); // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息 factory.setMessageConverter(messageConverter); // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor factory.setTaskExecutor(new SimpleAsyncTaskExecutor()); // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒 factory.setShutdownTimeout(10000); // 重试失败的消息之前等待的时间,默认为 5000 毫秒 factory.setRecoveryInterval(5000); // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true factory.setMissingQueuesFatal(false); // 监听器容器连接工厂 factory.setConnectionFactory(connectionFactory); return factory; } }
这些属性中的大多数都是可选的,可以根据需要进行设置。根据应用程序的需求,我们可以自由地调整这些属性,以提高应用程序的性能和可靠性。
发送消息
最后,我们可以编写一个简单的发送消息的代码来向队列中发送一些消息。以下是一个简单的示例:
@Component public class MySender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { for (int i = 0; i < 100; i++) { rabbitTemplate.convertAndSend("myQueue", "message:" + i); } } }
以上是SpringBoot怎么整合RabbitMq自定义消息监听容器来实现消息批量处理的详细内容。更多信息请关注PHP中文网其他相关文章!

本文讨论了使用Maven和Gradle进行Java项目管理,构建自动化和依赖性解决方案,以比较其方法和优化策略。

本文使用Maven和Gradle之类的工具讨论了具有适当的版本控制和依赖关系管理的自定义Java库(JAR文件)的创建和使用。

本文讨论了使用咖啡因和Guava缓存在Java中实施多层缓存以提高应用程序性能。它涵盖设置,集成和绩效优势,以及配置和驱逐政策管理最佳PRA

本文讨论了使用JPA进行对象相关映射,并具有高级功能,例如缓存和懒惰加载。它涵盖了设置,实体映射和优化性能的最佳实践,同时突出潜在的陷阱。[159个字符]

Java的类上载涉及使用带有引导,扩展程序和应用程序类负载器的分层系统加载,链接和初始化类。父代授权模型确保首先加载核心类别,从而影响自定义类LOA


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

DVWA
Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

SublimeText3 Linux新版
SublimeText3 Linux最新版

Dreamweaver CS6
视觉化网页开发工具

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。