首頁  >  文章  >  Java  >  Java RabbitMQ訊息佇列常見問題及解決方案分析

Java RabbitMQ訊息佇列常見問題及解決方案分析

王林
王林轉載
2023-04-23 09:49:062137瀏覽

訊息堆積

訊息堆積的產生場景:

  • 生產者產生的訊息速度大於消費者消費的速度。解決:增加消費者的數量或速度。

  • 沒有消費者進行消費的時候。解決:死信隊列、設定訊息有效期限。相當於對我們的訊息設定有效期,在規定的時間內如果沒有消費的話,自動過期,過期的時候會執行客戶端回呼監聽的方法將訊息存放到資料庫表記錄,後期實現補償。

保證訊息不遺失

1、生產者使用訊息確認機制保證訊息百分之百能夠將訊息投遞到MQ成功。

2、MQ伺服器端應該將訊息持久化到硬碟

3、消費者使用手動ack機制確認訊息消費成功

如果MQ伺服器容量滿了怎麼辦?

使用死信佇列將訊息存到資料庫中去,後期補償消費。

死信佇列

RabbitMQ死信佇列俗稱,備胎佇列;訊息中間件因為某些原因拒收該訊息後,可以轉移到死信佇列中存放,死信佇列也可以有交換器和路由key等。

產生背景:

  • 訊息投遞到MQ中存放訊息已經過期

  • 佇列達到最大的長度(佇列容器已經滿了)生產者拒絕接收訊息

  • 消費者消費多次訊息失敗,就會轉移存放到死信佇列中

程式碼案例:

maven依賴

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml設定

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

佇列設定類別

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信佇列消費者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通佇列消費者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

後台佇列管理頁面如下:

Java RabbitMQ訊息佇列常見問題及解決方案分析

#部署方式:死信佇列不能夠和正常佇列存在同一個伺服器中,應該分伺服器存放。

延遲佇列

訂單30分鐘未支付,系統自動逾時關閉的實作方案。

基於任務調度實現,效率是非常低。

基於redis過期key實現,key失效時會回呼客戶端一個方法。

用戶下單的時候,產生一個令牌(有效期限)30分鐘,存放到我們redis;缺點:非常冗餘,會在表中存放一個冗餘欄位。

基於mq的延遲佇列(最佳方案)rabbitmq情況下。

原理:在我們下單的時候,往mq投遞一個訊息設定有效期為30分鐘,但該訊息失效的時候(沒有被消費的情況下),執行我們客戶端一個方法告訴我們該消息已經失效,這時候查詢這筆訂單是否已經支付。

實作邏輯:

主要使用死信佇列來實作。

Java RabbitMQ訊息佇列常見問題及解決方案分析

想要的程式碼:就是正常的消費者不消費訊息,或是沒有正常的消費者,在設定的時間後進入死信佇列中,然後死信消費者實現對應的業務邏輯。

RabbitMQ訊息冪等問題

RabbitMQ訊息自動重試機制

當消費者商業邏輯程式碼中,拋出異常自動實作重試(預設是無數次重試)

應該對RabbitMQ重試次數實現限制,例如最多重試5次,每次間隔3s;重試多次還是失敗的情況下,存放到死信隊列或存放到資料庫表中記錄後期人工補償。因為重試失敗次數之後,佇列會自動刪除這個訊息。

訊息重試原理: 在重試的過程中,使用aop攔截我們的消費監聽方法,也不會列印這個錯誤日誌。如果重試多次還是失敗,達到最大失敗次數的時候才會列印錯誤日誌。

如果消費多次還是失敗的情況下:

1、自動刪除該訊息;(訊息可能遺失)

解決方案:

如果在充實多次還是失敗的情況下,最終存放到死信佇列;

採用表日誌記,消費失敗錯誤日誌的日誌記錄,後期人工自動對該訊息實現補償。

合理的選擇重試機制

消費者取得訊息後,呼叫第三方介面(HTTP請求),但是呼叫第三方介面失敗呢?是否需要重試 ?

答:有時是因為網路異常呼叫失敗,應該需要重試幾次。

消費者取得訊息後,應該程式碼問題拋出資料異常,是否需要重試?

答:不需要重試,程式碼異常需要重新修改程式碼發布專案。

消費者開啟手動ack模式

第一步、springboot專案配置需要開啟ack模式

acknowledge-mode:manual

第二步、消費者Java程式碼

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解決訊息冪等問題

什麼是訊息冪等性? MQ消費者如何保證冪等性?

產生的原因:是因為消費者可能會開啟自動重試,重試過程中可能會導致消費者業務邏輯程式碼重複執行。此刻訊息已經消費了,因為業務報錯導致訊息重新消費,這時會出現

解決方案:採用訊息全域id根據業務來定,根據業務id(全域唯一id)消費者可以判斷這條消息已經消費了。

消費者程式碼邏輯:

Java RabbitMQ訊息佇列常見問題及解決方案分析

RabbitMQ解決分散式事務問題

分散式事務:在分散式系統中,因為跨服務呼叫接口,存在多個不同的事務,每個事務都互不影響。就存在分散式事務的問題。

解決分散式交易核心思想:資料最終一致性。

分散式領域中名詞:

強一致性:要麼同步速度非常快或採用鎖定的機制不允許出現髒讀;

強一致性解決方案:要嘛資料庫A非常迅速的將資料同步給資料B,或資料庫A沒有同步完成之前資料庫B不能夠讀取資料。

弱一致性: 允許讀取的資料為原來的髒數據,允許讀取的結果不一致性。

最終一致性: 在我們的分散式系統中,因為資料之間同步透過網路實現通訊,短暫的資料延遲是允許的,但是最終資料必須要一致性。

基於RabbitMQ解決分散式事務的想法

基於RabbitMQ解決分散式事務的想法:(採用最終一致性的方案)

  • 確認我們的生產者訊息一定要投遞到MQ中(訊息確認機制)投遞失敗就繼續重試

  • 消費者採用手動ack的形式確認訊息實現消費注意冪等性問題,在消費失敗的情況下,mq自動幫消費者重試。

  • 保證我們的生產者第一事務先執行,如果執行失敗採用補單佇列(給生產者自己事務補充,確保生產者第一事務執行完成【資料最終一致性】)。

解決想法圖:核心是利用mq傳送訊息給其他系統將資料修改回來。

Java RabbitMQ訊息佇列常見問題及解決方案分析

以上是Java RabbitMQ訊息佇列常見問題及解決方案分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除