搜尋
首頁Javajava教程Java RabbitMQ訊息佇列常見問題及解決方案分析

訊息堆積

訊息堆積的產生場景:

  • 生產者產生的訊息速度大於消費者消費的速度。解決:增加消費者的數量或速度。

  • 沒有消費者進行消費的時候。解決:死信隊列、設定訊息有效期限。相當於對我們的訊息設定有效期,在規定的時間內如果沒有消費的話,自動過期,過期的時候會執行客戶端回呼監聽的方法將訊息存放到資料庫表記錄,後期實現補償。

保證訊息不遺失

1、生產者使用訊息確認機制保證訊息百分之百能夠將訊息投遞到MQ成功。

2、MQ伺服器端應該將訊息持久化到硬碟

3、消費者使用手動ack機制確認訊息消費成功

如果MQ伺服器容量滿了怎麼辦?

使用死信佇列將訊息存到資料庫中去,後期補償消費。

死信佇列

RabbitMQ死信佇列俗稱,備胎佇列;訊息中間件因為某些原因拒收該訊息後,可以轉移到死信佇列中存放,死信佇列也可以有交換器和路由key等。

產生背景:

  • 訊息投遞到MQ中存放訊息已經過期

  • 佇列達到最大的長度(佇列容器已經滿了)生產者拒絕接收訊息

  • 消費者消費多次訊息失敗,就會轉移存放到死信佇列中

程式碼案例:

maven依賴

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml設定

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

佇列設定類別

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信佇列消費者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通佇列消費者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

後台佇列管理頁面如下:

Java RabbitMQ訊息佇列常見問題及解決方案分析

#部署方式:死信佇列不能夠和正常佇列存在同一個伺服器中,應該分伺服器存放。

延遲佇列

訂單30分鐘未支付,系統自動逾時關閉的實作方案。

基於任務調度實現,效率是非常低。

基於redis過期key實現,key失效時會回呼客戶端一個方法。

用戶下單的時候,產生一個令牌(有效期限)30分鐘,存放到我們redis;缺點:非常冗餘,會在表中存放一個冗餘欄位。

基於mq的延遲佇列(最佳方案)rabbitmq情況下。

原理:在我們下單的時候,往mq投遞一個訊息設定有效期為30分鐘,但該訊息失效的時候(沒有被消費的情況下),執行我們客戶端一個方法告訴我們該消息已經失效,這時候查詢這筆訂單是否已經支付。

實作邏輯:

主要使用死信佇列來實作。

Java RabbitMQ訊息佇列常見問題及解決方案分析

想要的程式碼:就是正常的消費者不消費訊息,或是沒有正常的消費者,在設定的時間後進入死信佇列中,然後死信消費者實現對應的業務邏輯。

RabbitMQ訊息冪等問題

RabbitMQ訊息自動重試機制

當消費者商業邏輯程式碼中,拋出異常自動實作重試(預設是無數次重試)

應該對RabbitMQ重試次數實現限制,例如最多重試5次,每次間隔3s;重試多次還是失敗的情況下,存放到死信隊列或存放到資料庫表中記錄後期人工補償。因為重試失敗次數之後,佇列會自動刪除這個訊息。

訊息重試原理: 在重試的過程中,使用aop攔截我們的消費監聽方法,也不會列印這個錯誤日誌。如果重試多次還是失敗,達到最大失敗次數的時候才會列印錯誤日誌。

如果消費多次還是失敗的情況下:

1、自動刪除該訊息;(訊息可能遺失)

解決方案:

如果在充實多次還是失敗的情況下,最終存放到死信佇列;

採用表日誌記,消費失敗錯誤日誌的日誌記錄,後期人工自動對該訊息實現補償。

合理的選擇重試機制

消費者取得訊息後,呼叫第三方介面(HTTP請求),但是呼叫第三方介面失敗呢?是否需要重試 ?

答:有時是因為網路異常呼叫失敗,應該需要重試幾次。

消費者取得訊息後,應該程式碼問題拋出資料異常,是否需要重試?

答:不需要重試,程式碼異常需要重新修改程式碼發布專案。

消費者開啟手動ack模式

第一步、springboot專案配置需要開啟ack模式

acknowledge-mode:manual

第二步、消費者Java程式碼

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解決訊息冪等問題

什麼是訊息冪等性? MQ消費者如何保證冪等性?

產生的原因:是因為消費者可能會開啟自動重試,重試過程中可能會導致消費者業務邏輯程式碼重複執行。此刻訊息已經消費了,因為業務報錯導致訊息重新消費,這時會出現

解決方案:採用訊息全域id根據業務來定,根據業務id(全域唯一id)消費者可以判斷這條消息已經消費了。

消費者程式碼邏輯:

Java RabbitMQ訊息佇列常見問題及解決方案分析

RabbitMQ解決分散式事務問題

分散式事務:在分散式系統中,因為跨服務呼叫接口,存在多個不同的事務,每個事務都互不影響。就存在分散式事務的問題。

解決分散式交易核心思想:資料最終一致性。

分散式領域中名詞:

強一致性:要麼同步速度非常快或採用鎖定的機制不允許出現髒讀;

強一致性解決方案:要嘛資料庫A非常迅速的將資料同步給資料B,或資料庫A沒有同步完成之前資料庫B不能夠讀取資料。

弱一致性: 允許讀取的資料為原來的髒數據,允許讀取的結果不一致性。

最終一致性: 在我們的分散式系統中,因為資料之間同步透過網路實現通訊,短暫的資料延遲是允許的,但是最終資料必須要一致性。

基於RabbitMQ解決分散式事務的想法

基於RabbitMQ解決分散式事務的想法:(採用最終一致性的方案)

  • 確認我們的生產者訊息一定要投遞到MQ中(訊息確認機制)投遞失敗就繼續重試

  • 消費者採用手動ack的形式確認訊息實現消費注意冪等性問題,在消費失敗的情況下,mq自動幫消費者重試。

  • 保證我們的生產者第一事務先執行,如果執行失敗採用補單佇列(給生產者自己事務補充,確保生產者第一事務執行完成【資料最終一致性】)。

解決想法圖:核心是利用mq傳送訊息給其他系統將資料修改回來。

Java RabbitMQ訊息佇列常見問題及解決方案分析

以上是Java RabbitMQ訊息佇列常見問題及解決方案分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:亿速云。如有侵權,請聯絡admin@php.cn刪除
為什麼Java是開發跨平台桌面應用程序的流行選擇?為什麼Java是開發跨平台桌面應用程序的流行選擇?Apr 25, 2025 am 12:23 AM

javaispopularforcross-platformdesktopapplicationsduetoits“ writeonce,runany where”哲學。 1)itusesbytiesebyTecodeThatrunsonAnyJvm-備用Platform.2)librarieslikeslikeslikeswingingandjavafxhelpcreatenative-lookingenative-lookinguisis.3)

討論可能需要在Java中編寫平台特定代碼的情況。討論可能需要在Java中編寫平台特定代碼的情況。Apr 25, 2025 am 12:22 AM

在Java中編寫平台特定代碼的原因包括訪問特定操作系統功能、與特定硬件交互和優化性能。 1)使用JNA或JNI訪問Windows註冊表;2)通過JNI與Linux特定硬件驅動程序交互;3)通過JNI使用Metal優化macOS上的遊戲性能。儘管如此,編寫平台特定代碼會影響代碼的可移植性、增加複雜性、可能帶來性能開銷和安全風險。

與平台獨立性相關的Java開發的未來趨勢是什麼?與平台獨立性相關的Java開發的未來趨勢是什麼?Apr 25, 2025 am 12:12 AM

Java將通過雲原生應用、多平台部署和跨語言互操作進一步提昇平台獨立性。 1)雲原生應用將使用GraalVM和Quarkus提升啟動速度。 2)Java將擴展到嵌入式設備、移動設備和量子計算機。 3)通過GraalVM,Java將與Python、JavaScript等語言無縫集成,增強跨語言互操作性。

Java的強鍵入如何有助於平台獨立性?Java的強鍵入如何有助於平台獨立性?Apr 25, 2025 am 12:11 AM

Java的強類型系統通過類型安全、統一的類型轉換和多態性確保了平台獨立性。 1)類型安全在編譯時進行類型檢查,避免運行時錯誤;2)統一的類型轉換規則在所有平台上一致;3)多態性和接口機制使代碼在不同平台上行為一致。

說明Java本機界面(JNI)如何損害平台獨立性。說明Java本機界面(JNI)如何損害平台獨立性。Apr 25, 2025 am 12:07 AM

JNI會破壞Java的平台獨立性。 1)JNI需要特定平台的本地庫,2)本地代碼需在目標平台編譯和鏈接,3)不同版本的操作系統或JVM可能需要不同的本地庫版本,4)本地代碼可能引入安全漏洞或導致程序崩潰。

是否有任何威脅或增強Java平台獨立性的新興技術?是否有任何威脅或增強Java平台獨立性的新興技術?Apr 24, 2025 am 12:11 AM

新興技術對Java的平台獨立性既有威脅也有增強。 1)雲計算和容器化技術如Docker增強了Java的平台獨立性,但需要優化以適應不同雲環境。 2)WebAssembly通過GraalVM編譯Java代碼,擴展了其平台獨立性,但需與其他語言競爭性能。

JVM的實現是什麼,它們都提供了相同的平台獨立性?JVM的實現是什麼,它們都提供了相同的平台獨立性?Apr 24, 2025 am 12:10 AM

不同JVM實現都能提供平台獨立性,但表現略有不同。 1.OracleHotSpot和OpenJDKJVM在平台獨立性上表現相似,但OpenJDK可能需額外配置。 2.IBMJ9JVM在特定操作系統上表現優化。 3.GraalVM支持多語言,需額外配置。 4.AzulZingJVM需特定平台調整。

平台獨立性如何降低發展成本和時間?平台獨立性如何降低發展成本和時間?Apr 24, 2025 am 12:08 AM

平台獨立性通過在多種操作系統上運行同一套代碼,降低開發成本和縮短開發時間。具體表現為:1.減少開發時間,只需維護一套代碼;2.降低維護成本,統一測試流程;3.快速迭代和團隊協作,簡化部署過程。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

mPDF

mPDF

mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中