Home >Java >javaTutorial >Analysis of common problems and solutions of Java RabbitMQ message queue
The generation scenario of message accumulation:
The speed of messages generated by producers is greater than the speed of consumer consumption. Solution: Increase the number or speed of consumers.
When there are no consumers consuming. Solution: Dead letter queue, set message validity period. It is equivalent to setting a validity period for our messages. If there is no consumption within the specified time, it will automatically expire. When it expires, the client callback monitoring method will be executed to store the message in the database table record, and compensation will be realized later.
1. The producer uses the message confirmation mechanism to ensure that the message can be delivered to MQ successfully.
2. The MQ server should persist the message to the hard disk
3. The consumer uses the manual ack mechanism to confirm that the message consumption is successful
What to do if the MQ server capacity is full ?
Use the dead letter queue to store messages in the database and compensate for consumption later.
RabbitMQ dead letter queue is commonly known as the spare tire queue; after the message middleware rejects the message for some reason, it can be transferred to the dead letter queue for storage, the dead letter queue There can also be switches and routing keys, etc.
Generation background:
The message delivered to MQ and stored in MQ has expired
The queue has reached the maximum length (queue container Already full) The producer refuses to receive the message
If the consumer fails to consume multiple messages, it will be transferred to the dead letter queue
Code example:
maven dependency
<dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
yml configuration
server: # 服务启动端口配置 port: 8081 servlet: # 应用访问路径 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####连接地址 host: www.kaicostudy.com ####端口号 port: 5672 ####账号 username: kaico ####密码 password: kaico ### 地址 virtual-host: /kaicoStudy ###模拟演示死信队列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###备胎交换机 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
Queue configuration class
@Configuration public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 绑定死信队列到死信交换机 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 声明订单队列,并且绑定死信队列 * * @return Queue */ @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
Dead letter queue consumer
@Component public class OrderDlxConsumer { /** * 死信队列监听队列回调的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信队列消费订单消息" + msg); } }
Normal Queue consumer
@Component public class OrderConsumer { /** * 监听队列回调的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常订单消费者消息msg:" + msg); } }
The background queue management page is as follows:
Deployment method: The dead letter queue cannot exist in the same server as the normal queue and should be separated. Server storage.
Implementation plan for the system to automatically timeout and close if the order is not paid for 30 minutes.
Based on task scheduling, the efficiency is very low.
Based on the redis expired key implementation, a method will be called back to the client when the key expires.
When a user places an order, a token (validity period) is generated for 30 minutes and stored in our redis; Disadvantages: Very redundant, a redundant field will be stored in the table.
MQ-based delay queue (best solution) rabbitmq situation.
Principle: When we place an order, we deliver a message to mq and set the validity period to 30 minutes. But when the message expires (without being consumed), we execute a method on our client to tell us what to do. The message has expired. At this time, check whether the order has been paid.
Implementation logic:
Mainly use dead letter queue to implement.
The desired code: normal consumers do not consume messages, or there are no normal consumers. After the set time, they enter the dead letter queue and then die. Consumers implement corresponding business logic.
When an exception is thrown in the consumer business logic code, retries are automatically implemented (the default is countless retries) Try)
You should implement limits on the number of RabbitMQ retries, such as a maximum of 5 retries, with an interval of 3 seconds each time; if the retry fails multiple times, store it in the dead letter queue or store it in a database table. Record labor compensation later. Because after the number of failed retries, the queue will automatically delete the message.
Message retry principle: During the retry process, use aop to intercept our consumption listening method, and this error log will not be printed. If it fails after retrying multiple times, the error log will be printed only when the maximum number of failures is reached.
If the consumption fails after multiple times:
1. Automatically delete the message; (the message may be lost)
Solution:
If If the enrichment fails multiple times, it will eventually be stored in the dead letter queue;
uses table logging to record the consumption failure error log, and later manually compensates the message automatically.
After the consumer obtains the message, it calls the third-party interface (HTTP request), but fails to call the third-party interface? Need to try again?
Answer: Sometimes the call fails due to network exception, and it may need to be retried several times.
After the consumer obtains the message, a data exception is thrown due to code problems. Does it need to be retried?
Answer: There is no need to retry. If the code is abnormal, the code release project needs to be modified again.
The first step, springboot project configuration needs to turn on ack mode
acknowledge-mode: manual
Second step, consumer Java code
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
What is message idempotence? How does MQ consumer ensure idempotence?
Reason: The consumer may turn on automatic retry, and the retry process may cause the consumer's business logic code to be executed repeatedly. At this moment, the message has been consumed. Because the business error caused the message to be consumed again, the
solution will appear: use the message global id to determine according to the business. The consumer can judge this message based on the business id (global unique id). The message has been consumed.
Consumer code logic:
##RabbitMQ solves distributed transaction problemsDistributed transactions: In a distributed system, because across There are multiple different transactions in the service call interface, and each transaction does not affect each other. There is a problem of distributed transactions. The core idea of solving distributed transactions: final consistency of data. Noun in the distributed field: Strong consistency: either the synchronization speed is very fast or the lock mechanism does not allow dirty reads; Strong consistency solution: Either database A synchronizes data to data B very quickly, or database B cannot read the data before database A synchronization is completed. Weak consistency: The data that is allowed to be read is the original dirty data, and the results that are read are allowed to be inconsistent. Eventual consistency: In our distributed system, because data is communicated synchronously through the network, short data delays are allowed, but the final data must be consistent. The idea of solving distributed transactions based on RabbitMQThe idea of solving distributed transactions based on RabbitMQ: (Adopting the final consistency solution)The above is the detailed content of Analysis of common problems and solutions of Java RabbitMQ message queue. For more information, please follow other related articles on the PHP Chinese website!