­

Javaer 進階必看的 RocketMQ ,就這篇了

每個時代,都不會虧待會學習的人。

大家好,我是 yes。

繼上一篇 頭條終面:寫個消息中間件 ,我提到實現消息中間件的一些關鍵點,今天就和大家一起深入生產級別消息中間件 – RocketMQ 的內核實現,來看看真正落地能支撐萬億級消息容量、低延遲的消息隊列到底是如何設計的。

這篇文章我會先介紹整體的架構設計,然後再深入各核心模組的詳細設計、核心流程的剖析。

還會提及使用的一些注意點和最佳實踐。

對於消息隊列的用處和一些概念不太清楚的同學強烈建議先看消息隊列面試連環問,這篇文章介紹了消息隊列的使用場景、基本概念和常見面試題。

話不多說,上車。

RocketMQ 整體架構設計

整體的架構設計主要分為四大部分,分別是:Producer、Consumer、Broker、NameServer。

為了更貼合實際,我畫的都是集群部署,像 Broker 我還畫了主從。

  • Producer:就是消息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一台建立長連接,得知當前要發送的 Topic 存在哪台 Broker Master上,然後再與其建立長連接,支援多種負載平衡模式發送消息。

  • Consumer:消息消費者,也可以集群部署。它也會先和 NameServer 集群中的隨機一台建立長連接,得知當前要消息的 Topic 存在哪台 Broker Master、Slave上,然後它們建立長連接,支援集群消費和廣播消費消息。

  • Broker:主要負責消息的存儲、查詢消費,支援主從部署,一個 Master 可以對應多個 Slave,Master 支援讀寫,Slave 只支援讀。Broker 會向集群中的每一台 NameServer 註冊自己的路由資訊。

  • NameServer:是一個很簡單的 Topic 路由註冊中心,支援 Broker 的動態註冊和發現,保存 Topic 和 Borker 之間的關係。通常也是集群部署,但是各 NameServer 之間不會互相通訊, 各 NameServer 都有完整的路由資訊,即無狀態。

我再用一段話來概括它們之間的交互:

先啟動 NameServer 集群,各 NameServer 之間無任何數據交互,Broker 啟動之後會向所有 NameServer 定期(每 30s)發送心跳包,包括:IP、Port、TopicInfo,NameServer 會定期掃描 Broker 存活列表,如果超過 120s 沒有心跳則移除此 Broker 相關資訊,代表下線。

這樣每個 NameServer 就知道集群所有 Broker 的相關資訊,此時 Producer 上線從 NameServer 就可以得知它要發送的某 Topic 消息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發送消息。

Consumer 上線也可以從 NameServer 得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收消息。

簡單的工作流程如上所述,相信大家對整體數據流轉已經有點印象了,我們再來看看每個部分的詳細情況。

NameServer

它的特點就是輕量級,無狀態。角色類似於 Zookeeper 的情況,從上面描述知道其主要的兩個功能就是:Broker 管理、路由資訊管理。

總體而言比較簡單,我再貼一些欄位,讓大家有更直觀的印象知道它存儲了些什麼。

Producer

Producer 無非就是消息生產者,那首先它得知道消息要發往哪個 Broker ,於是每 30s 會從某台 NameServer 獲取 Topic 和 Broker 的映射關係存在本地記憶體中,如果發現新的 Broker 就會和其建立長連接,每 30s 會發送心跳至 Broker 維護連接。

並且會輪詢當前可以發送的 Broker 來發送消息,達到負載均衡的目的,在同步發送情況下如果發送失敗會默認重投兩次(retryTimesWhenSendFailed = 2),並且不會選擇上次失敗的 broker,會向其他 broker 投遞。

非同步發送失敗的情況下也會重試,默認也是兩次 (retryTimesWhenSendAsyncFailed = 2),但是僅在同一個 Broker 上重試。

Producer 啟動流程

然後我們再來看看 Producer 的啟動流程看看都幹了些啥。

大致啟動流程圖中已經表明的很清晰的,但是有些細節可能還不清楚,比如重平衡啊,TBW102 啥玩意啊,有哪些定時任務啊,別急都會提到的。

有人可能會問這生產者為什麼要啟拉取服務、重平衡?

因為 Producer 和 Consumer 都需要用 MQClientInstance,而同一個 clientId 是共用一個 MQClientInstance 的, clientId 是通過本機 IP 和 instanceName(默認值 default)拼起來的,所以多個 Producer 、Consumer 實際用的是一個MQClientInstance。

至於有哪些定時任務,請看下圖:

Producer 發消息流程

我們再來看看發消息的流程,大致也不是很複雜,無非就是找到要發送消息的 Topic 在哪個 Broker 上,然後發送消息。

現在就知道 TBW102 是啥用的,就是接受自動創建主題的 Broker 啟動會把這個默認主題登記到 NameServer,這樣當 Producer 發送新 Topic 的消息時候就得知哪個 Broker 可以自動創建主題,然後發往那個 Broker。

而 Broker 接受到這個消息的時候發現沒找到對應的主題,但是它接受創建新主題,這樣就會創建對應的 Topic 路由資訊。

自動創建主題的弊端

自動創建主題那麼有可能該主題的消息都只會發往一台 Broker,起不到負載均衡的作用。

因為創建新 Topic 的請求到達 Broker 之後,Broker 創建對應的路由資訊,但是心跳是每 30s 發送一次,所以說 NameServer 最長需要 30s 才能得知這個新 Topic 的路由資訊。

假設此時發送方還在連續快速的發送消息,那 NameServer 上其實還沒有關於這個 Topic 的路由資訊,所以有機會讓別的允許自動創建的 Broker 也創建對應的 Topic 路由資訊,這樣集群里的 Broker 就能接受這個 Topic 的資訊,達到負載均衡的目的,但也有個別 Broker 可能,沒收到。

如果發送方這一次發了之後 30s 內一個都不發,之前的那個 Broker 隨著心跳把這個路由資訊更新到 NameServer 了,那麼之後發送該 Topic 消息的 Producer 從 NameServer 只能得知該 Topic 消息只能發往之前的那台 Broker ,這就不均衡了,如果這個新主題消息很多,那台 Broker 負載就很高了。

所以不建議線上開啟允許自動創建主題,即 autoCreateTopicEnable 參數。

發送消息故障延遲機制

有一個參數是 sendLatencyFaultEnable,默認不開啟。這個參數的作用是對於之前發送超時的 Broker 進行一段時間的退避。

發送消息會記錄此時發送消息的時間,如果超過一定時間,那麼此 Broker 就在一段時間內不允許發送。

比如發送時間超過 15000ms 則在 600000 ms 內無法向該 Broker 發送消息。

這個機制其實很關鍵,發送超時大概率表明此 Broker 負載高,所以先避讓一會兒,讓它緩一緩,這也是實現消息發送高可用的關鍵。

小結一下

Producer 每 30s 會向 NameSrv 拉取路由資訊更新本地路由表,有新的 Broker 就和其建立長連接,每隔 30s 發送心跳給 Broker 。

不要在生產環境開啟 autoCreateTopicEnable。

Producer 會通過重試和延遲機制提升消息發送的高可用。

Broker

Broker 就比較複雜一些了,但是非常重要。大致分為以下五大模組,我們來看一下官網的圖。

  • Remoting 遠程模組,處理客戶請求。
  • Client Manager 管理客戶端,維護訂閱的主題。
  • Store Service 提供消息存儲查詢服務。
  • HA Serivce,主從同步高可用。
  • Index Serivce,通過指定key 建立索引,便於查詢。

有幾個模組沒啥可說的就不分析了,先看看存儲的。

Broker 的存儲

RocketMQ 存儲用的是本地文件存儲系統,效率高也可靠。

主要涉及到三種類型的文件,分別是 CommitLog、ConsumeQueue、IndexFile。

CommitLog

RocketMQ 的所有主題的消息都存在 CommitLog 中,單個 CommitLog 默認 1G,並且文件名以起始偏移量命名,固定 20 位,不足則前面補 0,比如 00000000000000000000 代表了第一個文件,第二個文件名就是 00000000001073741824,表明起始偏移量為 1073741824,以這樣的方式命名用偏移量就能找到對應的文件。

所有消息都是順序寫入的,超過文件大小則開啟下一個文件。

ConsumeQueue

ConsumeQueue 消息消費隊列,可以認為是 CommitLog 中消息的索引,因為 CommitLog 是糅合了所有主題的消息,所以通過索引才能更加高效的查找消息。

ConsumeQueue 存儲的條目是固定大小,只會存儲 8 位元組的 commitlog 物理偏移量,4 位元組的消息長度和 8 位元組 Tag 的哈希值,固定 20 位元組。

在實際存儲中,ConsumeQueue 對應的是一個Topic 下的某個 Queue,每個文件約 5.72M,由 30w 條數據組成。

消費者是先從 ConsumeQueue 來得到消息真實的物理地址,然後再去 CommitLog 獲取消息。

IndexFile

IndexFile 就是索引文件,是額外提供查找消息的手段,不影響主流程。

通過 Key 或者時間區間來查詢對應的消息,文件名以創建時間戳命名,固定的單個 IndexFile 文件大小約為400M,一個 IndexFile 存儲 2000W個索引。

我們再來看看以上三種文件的內容是如何生成的:

消息到了先存儲到 Commitlog,然後會有一個 ReputMessageService 執行緒接近實時地將消息轉發給消息消費隊列文件與索引文件,也就是說是非同步生成的。

消息刷盤機制

RocketMQ 提供消息同步刷盤和非同步刷盤兩個選擇,關於刷盤我們都知道效率比較低,單純存入記憶體中的話效率是最高的,但是可靠性不高,影響消息可靠性的情況大致有以下幾種:

  1. Broker 被暴力關閉,比如 kill -9
  2. Broker 掛了
  3. 作業系統掛了
  4. 機器斷電
  5. 機器壞了,開不了機
  6. 磁碟壞了

如果都是 1-4 的情況,同步刷盤肯定沒問題,非同步的話就有可能丟失部分消息,5 和 6就得依靠副本機制了,如果同步雙寫肯定是穩的,但是性能太差,如果非同步則有可能丟失部分消息。

所以需要看場景來使用同步、非同步刷盤和副本雙寫機制。

頁快取與記憶體映射

Commitlog 是混合存儲的,所以所有消息的寫入就是順序寫入,對文件的順序寫入和記憶體的寫入速度基本上沒什麼差別。

並且 RocketMQ 的文件都利用了記憶體映射即 Mmap,將程式虛擬頁面直接映射到頁快取上,無需有內核態再往用戶態的拷貝,來看一下我之前文章畫的圖。

頁快取其實就是作業系統對文件的快取,用來加速文件的讀寫,也就是說對文件的寫入先寫到頁快取中,作業系統會不定期刷盤(時間不可控),對文件的讀會先載入到頁快取中,並且根據局部性原理還會預讀臨近塊的內容。

其實也是因為使用記憶體映射機制,所以 RocketMQ 的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至記憶體中。

文件預分配和文件預熱

而記憶體映射也只是做了映射,只有當真正讀取頁面的時候產生缺頁中斷,才會將數據真正載入到記憶體中,所以 RocketMQ 做了一些優化,防止運行時的性能抖動。

文件預分配

CommitLog 的大小默認是1G,當超過大小限制的時候需要準備新的文件,而 RocketMQ 就起了一個後台執行緒 AllocateMappedFileService,不斷的處理 AllocateRequest,AllocateRequest 其實就是預分配的請求,會提前準備好下一個文件的分配,防止在消息寫入的過程中分配文件,產生抖動。

文件預熱

有一個 warmMappedFile 方法,它會把當前映射的文件,每一頁遍歷多去,寫入一個0位元組,然後再調用mlock 和 madvise(MADV_WILLNEED)。

mlock:可以將進程使用的部分或者全部的地址空間鎖定在物理記憶體中,防止其被交換到 swap 空間。

madvise:給作業系統建議,說這文件在不久的將來要訪問的,因此,提前讀幾頁可能是個好主意。

小結一下

CommitLog 採用混合型存儲,也就是所有 Topic 都存在一起,順序追加寫入,文件名用起始偏移量命名。

消息先寫入 CommitLog 再通過後台執行緒分發到 ConsumerQueue 和 IndexFile 中。

消費者先讀取 ConsumerQueue 得到真正消息的物理地址,然後訪問 CommitLog 得到真正的消息。

利用了 mmap 機制減少一次拷貝,利用文件預分配和文件預熱提高性能。

提供同步和非同步刷盤,根據場景選擇合適的機制。

Broker 的 HA

從 Broker 會和主 Broker 建立長連接,然後獲取主 Broker commitlog 最大偏移量,開始向主 Broker 拉取消息,主 Broker 會返回一定數量的消息,循環進行,達到主從數據同步。

消費者消費消息會先請求主 Broker ,如果主 Broker 覺得現在壓力有點大,則會返回從 Broker 拉取消息的建議,然後消費者就去從伺服器拉取消息。

Consumer

消費有兩種模式,分別是廣播模式和集群模式。

廣播模式:一個分組下的每個消費者都會消費完整的Topic 消息。

集群模式:一個分組下的消費者瓜分消費Topic 消息。

一般我們用的都是集群模式。

而消費者消費消息又分為推和拉模式,詳細看我這篇文章消息隊列推拉模式,分別從源碼級別分析了 RokcetMQ 和 Kafka 的消息推拉,以及推拉模式的優缺點。

Consumer 端的負載均衡機制

Consumer 會定期的獲取 Topic 下的隊列數,然後再去查找訂閱了該 Topic 的同一消費組的所有消費者資訊,默認的分配策略是類似分頁排序分配。

將隊列排好序,然後消費者排好序,比如隊列有 9 個,消費者有 3 個,那消費者-1 消費隊列 0、1、2 的消息,消費者-2 消費隊列 3、4、5,以此類推。

所以如果負載太大,那麼就加隊列,加消費者,通過負載均衡機制就可以感知到重平衡,均勻負載。

Consumer 消息消費的重試

難免會遇到消息消費失敗的情況,所以需要提供消費失敗的重試,而一般的消費失敗要麼就是消息結構有誤,要麼就是一些暫時無法處理的狀態,所以立即重試不太合適。

RocketMQ 會給每個消費組都設置一個重試隊列,Topic 是 %RETRY%+consumerGroup,並且設定了很多重試級別來延遲重試的時間。

為了利用 RocketMQ 的延時隊列功能,重試的消息會先保存在 Topic 名稱為「SCHEDULE_TOPIC_XXXX」的延遲隊列,在消息的擴展欄位裡面會存儲原來所屬的 Topic 資訊。

delay 一段時間後再恢復到重試隊列中,然後 Consumer 就會消費這個重試隊列主題,得到之前的消息。

如果超過一定的重試次數都消費失敗,則會移入到死信隊列,即 Topic %DLQ%" + ConsumerGroup 中,存儲死信隊列即認為消費成功,因為實在沒轍了,暫時放過。

然後我們可以通過人工來處理死信隊列的這些消息。

消息的全局順序和局部順序

全局順序就是消除一切並發,一個 Topic 一個隊列,Producer 和 Consuemr 的並發都為一。

局部順序其實就是指某個隊列順序,多隊列之間還是能並行的。

可以通過 MessageQueueSelector 指定 Producer 某個業務只發這一個隊列,然後 Comsuer 通過MessageListenerOrderly 接受消息,其實就是加鎖消費。

在 Broker 會有一個 mqLockTable ,順序消息在創建拉取消息任務的時候需要在 Broker 鎖定該消息隊列,之後加鎖成功的才能消費。

而嚴格的順序消息其實很難,假設現在都好好的,如果有個 Broker 宕機了,然後發生了重平衡,隊列對應的消費者實例就變了,就會有可能會出現亂序的情況,如果要保持嚴格順序,那此時就只能讓整個集群不可用了。

一些注意點

1、訂閱消息是以 ConsumerGroup 為單位存儲的,所以ConsumerGroup 中的每個 Consumer 需要有相同的訂閱。

因為訂閱消息是隨著心跳上傳的,如果一個 ConsumerGroup 中 Consumer 訂閱資訊不一樣,那麼就會出現互相覆蓋的情況。

比如消費者 A 訂閱 Topic a,消費者 B 訂閱 Topic b,此時消費者 A 去 Broker 拿消息,然後 B 的心跳包發出了,Broker 更新了,然後接到 A 的請求,一臉懵逼,沒這訂閱關係啊。

2、RocketMQ 主從讀寫分離

從只能讀,不能寫,並且只有當前客戶端讀的 offset 和 當前 Broker 已接受的最大 offset 超過限制的物理記憶體大小時候才會去從讀,所以正常情況下從分擔不了流量

3、單單加機器提升不了消費速度,隊列的數量也需要跟上。

4、之前提到的,不要允許自動創建主題

RocketMQ 的最佳實踐

這些最佳實踐部分參考自官網。

Tags的使用

建議一個應用一個 Topic,利用 tages 來標記不同業務,因為 tages 設置比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別。

Keys的使用

如果有消息業務上的唯一標識,請填寫到 keys 欄位中,方便日後的定位查找。

提高 Consumer 的消費能力

1、提高消費並行度:增加隊列數和消費者數量,提高單個消費者的並行消費執行緒,參數 consumeThreadMax。

2、批處理消費,設置 consumeMessageBatchMaxSize 參數,這樣一次能拿到多條消息,然後比如一個 update語句之前要執行十次,現在一次就執行完。

3、跳過非核心的消息,當負載很重的時候,為了保住那些核心的消息,設置那些非核心的消息,例如此時消息堆積 1W 條了之後,就直接返回消費成功,跳過非核心消息。

NameServer 的定址

請使用 HTTP 靜態伺服器定址(默認),這樣 NameServer 就能動態發現。

JVM選項

以下抄自官網:

如果不關心 RocketMQ Broker的啟動時間,通過「預觸摸」 Java 堆以確保在 JVM 初始化期間每個頁面都將被分配。

那些不關心啟動時間的人可以啟用它:​ -XX:+AlwaysPreTouch
禁用偏置鎖定可能會減少JVM暫停,​ -XX:-UseBiasedLocking
至於垃圾回收,建議使用帶JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30

另外不要把-XX:MaxGCPauseMillis的值設置太小,否則JVM將使用一個小的年輕代來實現這個目標,這將導致非常頻繁的minor GC,所以建議使用rolling GC日誌文件:

-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m

Linux內核參數

以下抄自官網:

  • vm.extra_free_kbytes,告訴VM在後台回收(kswapd)啟動的閾值與直接回收(通過分配進程)的閾值之間保留額外的可用記憶體。RocketMQ使用此參數來避免記憶體分配中的長延遲。(與具體內核版本相關)
  • vm.min_free_kbytes,如果將其設置為低於1024KB,將會巧妙的將系統破壞,並且系統在高負載下容易出現死鎖。
  • vm.max_map_count,限制一個進程可能具有的最大記憶體映射區域數。RocketMQ將使用mmap載入CommitLog和ConsumeQueue,因此建議將為此參數設置較大的值。(agressiveness –> aggressiveness)
  • vm.swappiness,定義內核交換記憶體頁面的積極程度。較高的值會增加攻擊性,較低的值會減少交換量。建議將值設置為10來避免交換延遲。
  • File descriptor limits,RocketMQ需要為文件(CommitLog和ConsumeQueue)和網路連接打開文件描述符。我們建議設置文件描述符的值為655350。
  • Disk scheduler,RocketMQ建議使用I/O截止時間調度器,它試圖為請求提供有保證的延遲。

最後

其實還有很多沒講,比如流量控制、消息的過濾、定時消息的實現,包括底層通訊 1+N+M1+M2 的 Reactor 多執行緒設計等等。

主要是內容太多了,而且也不太影響主流程,所以還是剝離出來之後寫吧,大致的一些實現還是講了的。

包括元資訊的交互、消息的發送、存儲、消費等等。

關於事務消息的那一塊我之前文章也分析過了,所以這個就不再貼了。

可以看到要實現一個生產級別的消息隊列還是有很多很多東西需要考慮的,不過大致的架構和涉及到的模組差不多就這些了。

至於具體的細節深入,還是得靠大家自行研究了,我就起個拋磚引玉的作用。

最後個人能力有限,如果哪裡有紕漏請抓緊聯繫鞭撻我,可看網頁公告聯繫我。


我是 yes,從一點點到億點點,我們下篇見