Home >Java >javaTutorial >Analysis of common problems and solutions of Java RabbitMQ message queue

Analysis of common problems and solutions of Java RabbitMQ message queue

王林
王林forward
2023-04-23 09:49:062259browse

Message accumulation

The generation scenario of message accumulation:

  • The speed of messages generated by producers is greater than the speed of consumer consumption. Solution: Increase the number or speed of consumers.

  • When there are no consumers consuming. Solution: Dead letter queue, set message validity period. It is equivalent to setting a validity period for our messages. If there is no consumption within the specified time, it will automatically expire. When it expires, the client callback monitoring method will be executed to store the message in the database table record, and compensation will be realized later.

Ensure that messages are not lost

1. The producer uses the message confirmation mechanism to ensure that the message can be delivered to MQ successfully.

2. The MQ server should persist the message to the hard disk

3. The consumer uses the manual ack mechanism to confirm that the message consumption is successful

What to do if the MQ server capacity is full ?

Use the dead letter queue to store messages in the database and compensate for consumption later.

Dead letter queue

RabbitMQ dead letter queue is commonly known as the spare tire queue; after the message middleware rejects the message for some reason, it can be transferred to the dead letter queue for storage, the dead letter queue There can also be switches and routing keys, etc.

Generation background:

  • The message delivered to MQ and stored in MQ has expired

  • The queue has reached the maximum length (queue container Already full) The producer refuses to receive the message

  • If the consumer fails to consume multiple messages, it will be transferred to the dead letter queue

Code example:

maven dependency

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml configuration

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

Queue configuration class

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

Dead letter queue consumer

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

Normal Queue consumer

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

The background queue management page is as follows:

Analysis of common problems and solutions of Java RabbitMQ message queue

Deployment method: The dead letter queue cannot exist in the same server as the normal queue and should be separated. Server storage.

Delay Queue

Implementation plan for the system to automatically timeout and close if the order is not paid for 30 minutes.

Based on task scheduling, the efficiency is very low.

Based on the redis expired key implementation, a method will be called back to the client when the key expires.

When a user places an order, a token (validity period) is generated for 30 minutes and stored in our redis; Disadvantages: Very redundant, a redundant field will be stored in the table.

MQ-based delay queue (best solution) rabbitmq situation.

Principle: When we place an order, we deliver a message to mq and set the validity period to 30 minutes. But when the message expires (without being consumed), we execute a method on our client to tell us what to do. The message has expired. At this time, check whether the order has been paid.

Implementation logic:

Mainly use dead letter queue to implement.

Analysis of common problems and solutions of Java RabbitMQ message queue

The desired code: normal consumers do not consume messages, or there are no normal consumers. After the set time, they enter the dead letter queue and then die. Consumers implement corresponding business logic.

RabbitMQ message idempotence problem

RabbitMQ message automatic retry mechanism

When an exception is thrown in the consumer business logic code, retries are automatically implemented (the default is countless retries) Try)

You should implement limits on the number of RabbitMQ retries, such as a maximum of 5 retries, with an interval of 3 seconds each time; if the retry fails multiple times, store it in the dead letter queue or store it in a database table. Record labor compensation later. Because after the number of failed retries, the queue will automatically delete the message.

Message retry principle: During the retry process, use aop to intercept our consumption listening method, and this error log will not be printed. If it fails after retrying multiple times, the error log will be printed only when the maximum number of failures is reached.

If the consumption fails after multiple times:

1. Automatically delete the message; (the message may be lost)

Solution:

If If the enrichment fails multiple times, it will eventually be stored in the dead letter queue;

uses table logging to record the consumption failure error log, and later manually compensates the message automatically.

Reasonable choice of retry mechanism

After the consumer obtains the message, it calls the third-party interface (HTTP request), but fails to call the third-party interface? Need to try again?

Answer: Sometimes the call fails due to network exception, and it may need to be retried several times.

After the consumer obtains the message, a data exception is thrown due to code problems. Does it need to be retried?

Answer: There is no need to retry. If the code is abnormal, the code release project needs to be modified again.

Consumers turn on manual ack mode

The first step, springboot project configuration needs to turn on ack mode

acknowledge-mode: manual

Second step, consumer Java code

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

How rabbitMQ solves the problem of message idempotence

What is message idempotence? How does MQ consumer ensure idempotence?

Reason: The consumer may turn on automatic retry, and the retry process may cause the consumer's business logic code to be executed repeatedly. At this moment, the message has been consumed. Because the business error caused the message to be consumed again, the

solution will appear: use the message global id to determine according to the business. The consumer can judge this message based on the business id (global unique id). The message has been consumed.

Consumer code logic:

Analysis of common problems and solutions of Java RabbitMQ message queue

##RabbitMQ solves distributed transaction problems

Distributed transactions: In a distributed system, because across There are multiple different transactions in the service call interface, and each transaction does not affect each other. There is a problem of distributed transactions.

The core idea of ​​solving distributed transactions: final consistency of data.

Noun in the distributed field:

Strong consistency: either the synchronization speed is very fast or the lock mechanism does not allow dirty reads;

Strong consistency solution: Either database A synchronizes data to data B very quickly, or database B cannot read the data before database A synchronization is completed.

Weak consistency: The data that is allowed to be read is the original dirty data, and the results that are read are allowed to be inconsistent.

Eventual consistency: In our distributed system, because data is communicated synchronously through the network, short data delays are allowed, but the final data must be consistent.

The idea of ​​solving distributed transactions based on RabbitMQ

The idea of ​​solving distributed transactions based on RabbitMQ: (Adopting the final consistency solution)

  • Confirm Our producer messages must be delivered to MQ (message confirmation mechanism). If delivery fails, continue to try again.

  • Consumers use manual ack to confirm messages to achieve consumption. Pay attention to idempotence issues. , when consumption fails, mq automatically helps the consumer retry.

  • Ensure that our producer's first transaction is executed first. If the execution fails, use the supplementary queue (supplement the producer's own transaction to ensure that the producer's first transaction is executed [the data is ultimately consistent] sex】).

Solution map: The core is to use mq to send messages to other systems to modify the data back.

Analysis of common problems and solutions of Java RabbitMQ message queue

The above is the detailed content of Analysis of common problems and solutions of Java RabbitMQ message queue. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete