搜尋
首頁JavaJava基礎終於來了...RocketMQ掃盲篇

終於來了...RocketMQ掃盲篇

Oct 20, 2020 pm 05:16 PM
rocketmq

java基礎教學欄位今天詳細介紹RocketMQ知識。

終於來了...RocketMQ掃盲篇

又是好久沒有寫部落格了,雖然可以找出無數個沒有寫的部落格的理由,但是說到底,還是一個字「懶」。今天我終於吃了一顆治療懶癌的藥丸,決定寫一篇部落格。介紹什麼好呢,思來想去,還是介紹下RocketMQ吧,畢竟寫了30多篇博客,還沒好好寫過關於MQ的博客呢。本篇部落格比較基礎,不涉及源碼分析,只是掃盲。

MQ有什麼用

解耦

我覺得從某個角度來說,微服務促進了MQ的蓬勃發展,本來一個系統有N多個模組,所有模組都強烈耦合在一起,現在微服務了,一個模組就是一個系統,系統之間肯定需要交互,交互有三種常見的方法,一種是RPC,一種是HTTP,一種就是MQ了。

非同步

原本一個業務分成N步,要一步一步處理,才能把最終的結果回傳給用戶,現在有了MQ,先把最關鍵的部分處理完畢,然後發送訊息到MQ,直接回傳給用戶OK,至於後面的步驟在後台慢慢處理吧,真乃提升使用者體驗的神器。

削峰

某個介面的請求量突然飆升,勢必會對應用程式伺服器、資料庫伺服器造成很大的壓力,現在有了MQ,來多少請求都不在怕的,後台慢慢處理嗆。

RocketMQ簡介

RocketMQ是用Java寫的,是阿里開源的訊息中間件,吸收了Kafka很多優點。 Kafka也是比較熱門的訊息中間件,不過Kafka是用Scala寫的,不利於Java程式設計師去閱讀原始碼,也不利於Java程式設計師做一些客製化的開發。接觸過Kafka的小夥伴都知道,要用好Kafka實屬不易,相對來說,RocketMQ簡單多了,而且RocketMQ有阿里加持,經歷了N次雙11的考驗,比較適合國內互聯網公司,所以國內使用RocketMQ的公司很多。

RocketMQ四大元件

終於來了...RocketMQ掃盲篇圖片來自gitee.com/mirrors/roc…

可以看到RocketMQ主要有四個元件:

#NameServer

  • 無狀態服務,註冊中心,可叢集部署,但NameServer節點之間沒有任何資料互動。
  • Borker會以定時把Topic路由資訊回報給所有的NameServer。 Producer、Consumer會隨機選擇一個NameServer定時Topic更新路由資訊。
  • Topic路由資訊在NameServer叢集中採用最終一致性。
  • 保證AP。

Borker

  • RocketMQ的服務端,用於儲存訊息、分發訊息。
  • Borker會定時把自己擁有的所有的Topic路由資訊回報給NameServer。
  • Borker有兩個角色:Master、Follower,Master承擔讀取(消費訊息)寫入(生產訊息)操作,如果Master比較忙,或者不可用,Follower可以承擔讀取操作。 BorkerId=0,代表是Matser,BorkerId!=0,代表是Follower,需要注意的有兩點: 其一,目前為止,BorkerId=1的Follower才可以承擔讀取操作; 其二,只有較高版本的RocketMQ支援當Master節點掛掉,Follower自動升級到Master。

Producer

生產者,每隔一段時間就會向NameServer發起Topic的路由資訊查詢。

Consumer

消費者,每隔一段時間就會向NameServer發起Topic的路由資訊查詢。

為什麼註冊中心不選用Zookeeper

其實,在低版本的RocketMQ中,確實是選用Zookeeper作為註冊中心的,但是後面改成了現在的NameServer,猜想主要原因是:

  • RocketMQ已經是一個中間件了,不想再依賴其他中間件。
  • Zookeeper比較重,有很多功能RocketMQ是用不到的,不如寫一個輕量級的註冊中心。
  • Zookeeper是CP,一旦觸發領導選舉,那麼註冊中心就不可用了,而RocketMQ的註冊中心,不需要強一致性,只要保證最終一致性。

RocketMQ訊息領域模型

Message

  • #傳送的訊息。
  • 訊息必須有Topic。
  • 訊息可以有多個Tag和多個Key,可以看做訊息的附加屬性。

Topic

  • 一類訊息的集合。
  • 每個訊息必須有一個Topic。
  • 訊息的第一級類型。

Tag

  • 一個訊息除了有Topic之外,還可以有Tag,用來細分同一個Topic下的不同種類的訊息。
  • Tag不是必須的。
  • 訊息的第二級類型。

Group

分為ProducerGroup,ConsumerGroup,我們更多的是關注ConsumerGroup,ConsumerGroup包含多個Consumer。

在集群消費模式下,一個ConsumerGroup下的Consumer共同消費一個Topic,且每個Consumer會被分配到N個隊列,但是一個隊列只會被一個Consumer消費,不同的ConsumerGroup可以消費同一個Topic,一則訊息會被訂閱此Topic的所有ConsumerGroup消耗。

Queue

  • 一個Topic預設包含四個Queue。
  • 在叢集消費模式下,同一個ConsumerGroup中的Consumer可以消費多個Queue的訊息,但是一個Queue只能被一個Consumer消耗。
  • Queue中的消息是有序的。
  • 分為讀Queue和寫Queue,一般來說,讀Queue的數量和寫Queue的數量是一致的,否則很容易出問題。

消費模式

消費模式有兩種:Clustering(群集消費)和Broadcasting(廣播消費)。

和其他MQ不同,其他MQ是在發送訊息的時候,指定是叢集消費還是廣播消費,RocketMQ是在消費者端設定是叢集消費還是廣播消費。

Clustering(集群消費)

預設情況下是集群消費模式,該模式下,ConsumerGroup所有的Consumer共同消費一個Topic的消息,每個Consumer負責消費N個隊列的消息(N也可能為1,甚至是0,沒有分配到隊列),但是一個隊列只會被一個Consumer消費。如果某個Consumer掛掉,ConsumerGroup下的其他Consumer會接替掛掉的Consumer繼續消費。

叢集消費模式下,消費進度維護在Borker端,儲存路徑為${ROCKET_HOME}/store/config/ consumerOffset.json,如下圖所示:終於來了...RocketMQ掃盲篇使用topicName@consumerGroupName為Key,消費進度為Value,Value的形式是queueId:offset ,說明如果有多個ConsumerGroup,每個ConsumerGroup的消費進度是不同的,需要分開來儲存。

Broadcasting(廣播消費)

廣播消費訊息會發給ConsumerGroup中所有的Consumer。

廣播消費模式下,消費進度維護在Consumer端。

消費隊列負載演算法與重平衡機制

消費隊列負載演算法

我們知道了在叢集消費模式下,ConsumerGroup下所有的Consumer共同消費一個Topic的消息,每個Consumer負責消費N個隊列的訊息,那麼具體是如何分配的呢?這就牽涉到消費隊列負載演算法了。

RocketMQ提供了眾多的消費隊列負載演算法,其中最常用的是兩種演算法,即AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle。下面我們來看下這兩個演算法的差別。

假設,現在一個Topic有16個佇列,用q0~q15表示,有3個Consumer,用c0-c2表示。

以AllocateMessageQueueAveragely消費佇列負載演算法的結果如下:

  • c0:q0 q1 q2 q3 q4 q5
  • #c1:q6 q7 q8 q9 q10

################################################################## ##c2:q11 q12 q13 q14 q15#########用AllocateMessageQueueAveragelyByCircle消費隊列負載算法的結果如下:#########c0:q0 q3 q6 q9 q12 q15##### #c1:q1 q4 q7 q10 q13######c2:q2 q5 q8 q11 q14#########ConsumerGroup下所有的Consumer共同消費一個Topic的訊息,每個Consumer負責消費N個隊列的消息,但是一個隊列不能同時被N個Consumer消費,這意味著什麼? ######聰明的你一定可以想到,如果一個Topic只有4個隊列,而有5個Consumer,那麼有一個Consumer將不能分配到任何隊列,所以在RocketMQ中,Topic下隊列的個數直接決定了Consumer的最大個數,也就說明,不能光靠增加Consumer來提高消費速度。 ###

重平衡

雖然建議在創建Topic的時候,就應該充分考慮隊列的個數,但是實際情況往往是不盡人意的,即使隊列數沒有改變,Consumer的數量也一定會改變,像是Consumer的上下線,像是某個Consumer掛了,像是新增了Consumer。隊列的擴容、縮容,Consumer的擴容、縮容都會導致重平衡,也就是為Consumer重新分配消費的隊列。

在RocketMQ中,Consumer會定時查詢Topic的佇列的個數,Consumer的個數,如果發生了改變,就會觸發重平衡。

重平衡是RocketMQ內部實現的,程式設計師無需關心。

Pull OR Push?

一般來說,MQ有兩種方法來取得訊息:

  • Pull:Consumer主動拉取訊息,好處是Consumer可以控制拉取訊息的頻率,條數,Consumer知道自己的消費能力,所以在Consumer端不容易造成消息堆積,但是實時性不是太好,效率相對較低。
  • Push:Broker主動發送訊息,好處是即時性、效率比較高,但是Broker無法知道Consumer端的消費能力,如果發給Consumer的訊息過多,會造成Consumer端的訊息堆積;如果發給Consumer的資料太少,又會造成Consumer端的空閒。

不管是Pull,還是Push,Consumer總是會與Broker產生交互,交互的方式一般有短連接、長連接、輪詢三種方式。

看起來,RocketMQ支援既支援Pull,也支援Push,但實際上Push也是用Pull實現的,那麼Consumer又是怎麼與Broker產生互動的呢?

這就是RocketMQ設計的巧妙的地方了,既不是短連接,也不是長連接,也不是輪詢,而是採用的長輪詢。

長輪詢

Consumer發起拉取訊息的請求,分為兩種情況:

  • 有訊息:Consumer拿到訊息後,連線中斷。
  • 沒有訊息:Borker Hold(保持)住連線一定時間,每隔5秒,檢查下是否有訊息,如果有訊息,給Consumer,連線中斷。

事務訊息

RocketMQ支援事務訊息,Producer把事務訊息傳送給Broker後,Broker會把訊息儲存在系統Topic:RMQ_SYS_TRANS_HALF_TOPIC#,這樣Consumer就無法消費到這則訊息了。

Broker會有一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC的訊息,向Producer發起回查,回查的狀態有三種:提交、回滾、未知。

  • 如果回查的狀態是提交,回滾,會觸發訊息的提交和回滾;
  • 如果是未知,會等待下一次回查,RocketMQ可以設定一條訊息的回查間隔與回查次數,超過一定的回查次數,訊息會自動回滾。

延遲訊息

延遲訊息是指息發到Broker後,不能立刻被Consumer消費,需要等待一定的時間才可以被消費到,RocketMQ只支援特定的延遲時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消費形式

RocketMQ支援兩種消費形式:並發消費、順序消費。 如果是順序消費,需要保證排序的訊息在同一個佇列。如何選擇隊列發送呢,RocketMQ發送訊息的方法有好幾個重載,其中有一個重載方法來支援隊列的選擇。

同步刷盤、非同步刷盤

Producer把訊息送到Borker中,Borker是需要把訊息持久化的,RocketMQ支援兩種持久化策略:

  • #同步刷盤:Borker把訊息持久後才回給Producer,好處是訊息可靠性高,但效率較慢。
  • 非同步刷盤:Broker把訊息寫入PageCache中,就回傳ACK給Producer。好處是效率極高,但如果伺服器掛了,訊息可能會遺失,如果只是RocketMQ服務掛了,不會造成訊息遺失。

同步複製、非同步複製

為了MQ的可靠性、可用性,在生產環境,一般會部署Follower節點,Follower節點會複製Master的數據,RocketMQ支援兩種持複製策略:

  • 同步複製:Master、Follower都把訊息寫入成功,才回傳ACK給Producer,可靠性較高,但效率較慢。
  • 非同步複製:只要Master寫入成功,就回傳ACK給Producer,效率較高,但可能會遺失訊息。

"寫入"是寫入PageCache,還是寫入硬碟,要看Follower Broker的設定。

再談談Producer

RocketMQ提供了三種傳送訊息的方法:

  • oneway:fire and forget,單向訊息,指訊息發送出去後,就不管了,這個方法是沒有回傳值的。
  • 同步:訊息發送出去後,同步等待Borker的回應。
  • 非同步:訊息發送出去後,立即返回,收到Boker的回應後,會執行函調方法。

在實際開發中,一般選用同步方法,如果要提高RocketMQ的效能,一般都是修改Borker端的參數,特別是刷盤策略和複製策略。

傳送訊息重試

訊息傳送時,如果使用了MessageQueueSelector,那麼訊息傳送的重試機制將會失效。

發送訊息回應可能為以下四種:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}复制代码

除了第一種,其他情況都是有問題的,為了保證訊息不會遺失,需要設定Producer參數:RetryAnotherBrokerWhenNotStoreOK 為true。

故障規避機制

如果訊息發送失敗了,重試的時候,還是發送給這個Borker,那麼大概率發送還是失敗的,RockteMQ設計精巧之處在於,重試的時候,會自動避開這個Borker,而選擇其他Borker,但是目前為止,異步發送沒有那麼智能,只會在一個Borker上重試,所以強烈建議選擇同步發送方式。

RocketMQ提供了兩種故障規避機制。用參數SendLatencyFaultEnable來控制。

  • false:預設值,只有在重試的時候,才會啟用故障規避機制,例如發送訊息給BorkerA失敗了,重試的時候,會選擇BorkerB,但是下次發送訊息,還是會選擇發送給BorkerA。
  • true:開啟延遲退避機制,一旦訊息發送給BorkerA失敗,就會悲觀的認為在一段時間內,BorkerA不可用,在將來的一段時間內,不會再向BorkerA發送訊息。

延遲退避機制看起來很好用,但是一般來說Borker端繁忙,導致Borker不可用或網路不可用只是一瞬間的事情,馬上就可以恢復,如果開啟了延遲退避機制,本來可用的Borker在一段時間內卻被規避了,其他Borker更加繁忙,那可能情況更糟。

再談談Consumer

Consumer執行緒注意事項

Consumer有兩個參數,可以消耗的平行度,也就是ConsumeThreadMin ConsumeThreadMax,看起來給人的感覺是,如果Consumer端堆積訊息比較少,消費執行緒數為ConsumeThreadMin;如果Consumer端堆積訊息比較多,就自動開啟新的執行緒來消費,直到消費線程數為ConsumeThreadMax。但不是這樣,Consumer內部持有一個執行緒池,選用的是無界隊列,也就是ConsumeThreadMax參數是無效的,所以在實際開發中,ConsumeThreadMin ConsumeThreadMax往往設定成一樣。

ConsumeFromWhere

如果查詢不到消費進度的時候,Consumer從哪裡開始消費,RocketMQ支援從最新消息、最早訊息、指定時間戳這三種方式進行消費。

消費訊息重試

RocketMQ會為每個ConsumerGroup都設定一個Topic名稱為%RETRY% consumerGroup的重試佇列,用來儲存需要給ConsumerGroup重試的訊息,但重試需要一定的延時時間,RocketMQ對於重試訊息的處理是先儲存至Topic名稱為SCHEDULE_TOPIC_XXXX的延遲佇列中,後台定時任務依照對應的時間進行Delay後重新儲存至%RETRY% consumerGroup的重試佇列中。

訊息堆積、消費能力不夠,怎麼辦

  • #提高消費進度,這是最好的方法。
  • 增加佇列,增加Consumer。
  • 原先的Consumer作為搬磚工,依照一定的規則把訊息「搬」到多個新的Topic,再開幾個ConsumerGroup去消費不同的Topic。
  • 新開一個ConsumerGroup去消費,也就是兩個ConsumerGroup同時消費一個Topic,但是需要注意offset的判斷,比如一個ConsumerGroup消費offset為奇數的消息,一個ConsumerGroup消費offset為偶數的消息。

本來以為寫掃盲文,應該會寫的很順,但是還是想太多了,因為是掃盲文,面向的是沒有怎麼接觸過RocketMQ的小伙伴,但是RocketMQ有沒有那麼簡單,不可能用一篇博客,就讓沒有怎麼接觸過RocketMQ的小伙伴順利入門,所以在寫博客的時候,一直在想,這個東西重要嗎,需要仔細描述嗎;這個東西可以忽視,可以不介紹嗎等等,大家可以看到本文基本上都是在介紹各種概念,幾乎沒有牽涉到API的層面,因為一旦牽涉到API,那麼估計寫兩個星期也寫不完。

End

相關免費學習推薦:#java基礎教學

以上是終於來了...RocketMQ掃盲篇的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:juejin。如有侵權,請聯絡admin@php.cn刪除

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前By尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前By尊渡假赌尊渡假赌尊渡假赌

熱工具

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )專業的PHP整合開發工具

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

將Eclipse與SAP NetWeaver應用伺服器整合。

SublimeText3 英文版

SublimeText3 英文版

推薦:為Win版本,支援程式碼提示!

Atom編輯器mac版下載

Atom編輯器mac版下載

最受歡迎的的開源編輯器

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具