一張圖進階 RocketMQ – 消息存儲

前言

三此君看了好幾本書,看了很多遍源碼整理的 一張圖進階 RocketMQ 圖片,關於 RocketMQ 你只需要記住這張圖!覺得不錯的話,記得點贊關注哦。
一張圖進階 RocketMQ.jpg
【重要】視頻在 B 站同步更新,歡迎圍觀,輕輕鬆鬆漲姿勢。一張圖進階 RocketMQ-消息存儲(視頻版)
點擊查看【bilibili】

本文是「一張圖進階 RocketMQ」第 5 篇,對 RocketMQ 不了解的同學可以先看看前面 4 期:

  1. 一張圖進階 RocketMQ-整體架構
  2. 一張圖進階 RocketMQ – NameServer
  3. 一張圖進階 RocketMQ – 消息發送
  4. 一張圖進階 RocketMQ – 通信機制

前面兩期我們主要分享了 RocketMQ 是如何將消息發送出去的,現在消息已經被 Netty 送上路了,接力棒已經交給了 Broker。如果我們自己來實現 Broker 會怎麼實現呢?首先肯定得把消息存起來吧,不然宕機了,消息丟失了,那就離大譜了。

可是消息要以什麼結構存儲呢?二進制、JSON、PB?從功能上來看肯定都是可以的,那 RocketMQ 到底是怎麼搞的?

解決了存儲結構問題,那消息存到哪裡呢?數據庫,本地文件,還是對象存儲服務器?從功能的角度肯定也都是可以的。可是,哪家數據庫可以支持單機十萬級吞吐量?那我直接統統存到數據庫得了,瞎折騰些啥。難道存在本地文件就可以了?我們自己實現不可以,但是 RocketMQ 可以,那 RocketMQ 有什麼黑科技呢?

所以我們今天就來聊一聊 Broker 如何存儲消息,【首先明確我們的目標】我們需要先了解 RocketMQ 的存儲結構,也就是消息是如何組織的。了解了存儲結構,我們才能更好的理解存儲流程,不然我們不知道為什麼流程是這樣的。最後我們需要了解有哪些機制支撐 RocketMQ 單機十萬級吞吐量。

存儲架構

image.png
消息在 Broker 上的存儲結構如上圖,所有相關文件放在 ROCKETMQ_HOME 下,有哪些文件呢?存放消息本身的 CommitLog,以及消息的索引文件 ConsumeQueue 和 IndexFile:

  • CommitLog

從物理結構上來看,所有的消息都存儲在 CommitLog 裏面,其實就是所有的消息按照「消息在 CommitLog 各字段示意圖」所示,挨個按順序存儲到文件中。

單個 CommitLog 文件大小默認 1G ,文件名長度為 20 位,左邊補零,剩餘為起始偏移量。比如 00000000000000000000 代表了第一個文件,起始偏移量為 0,文件大小為 1G=1073741824;當第一個文件寫滿了,第二個文件為 00000000001073741824,起始偏移量為 1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件。CommitLog 順序寫,可以大大提高寫入效率。

但是問題來了,消息發送的時候我們指定了 Topic,現在所有 Topic 都順序個寫入到 CommitLog,存入的時候是安逸了(順序寫),但是獲取消息可就麻煩了。如果我要獲取某個 Topic 的消息,需要遍歷 commitlog 文件,根據 topic 過濾消息。CommitLog 這個渣男,只管自己爽。有什麼辦法可以提高消息查詢效率呢?

  • ConsumeQueue

我們再回憶一下,消息存入的時候是指定了 Topic,同時我們也說了每個 Topic 會對應多個 ConsumeQueue( queueId 標識)。關鍵就在 ConsumeQueue 上,ConsumeQueue 是指定 Topic 消息的索引文件,怎麼理解呢?從「消息在 ConsumeQueue 各字段示意圖」可知,每個條目共 20 個位元組,分別為 8 位元組的 commitlog 物理偏移量、4 位元組的消息長度、8 位元組 tag hashcode,單個文件由 30W 個條目組成,可以像數組一樣隨機訪問每一個條目,每個 ConsumeQueue 文件大小約 5.72M。ConsumeQueue 文件可以看成是基於 topic 的 commitlog 索引文件。Consumer 即可根據 ConsumeQueue 來查找待消費的消息。

因為 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在實際情況中,大部分的 ConsumeQueue 能夠被全部讀入內存,所以這個中間結構的操作速度很快,可以認為是內存讀取的速度。此外為了保證 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存儲了 ConsumeQueues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丟失,也可以通過 CommitLog 完全恢復出來。
ConsumeQueue 文件夾的組織方式如下:topic/queue/file 三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

  • IndexFile

IndexFile 是另一種可選索引文件,提供了一種可以通過 key 或時間區間來查詢消息的方法。 IndexFile 索引文件其底層實現為 hash 索引,類似於 Java 1.7 HashMap,計算 Key 的 hashcode,hashcode 取余得到 hash 槽,拉鏈法解決哈希衝突。Index 文件的存儲位置是:$HOME \store\index${fileName},文件名 fileName 是以創建時的時間戳命名的,固定的單個 IndexFile 文件大小約為 400M,一個 IndexFile 可以保存 2000W 個索引。

所以,RocketMQ 消息存儲架構主要有 CommitLog,ConsumeQueue,IndexFile 構成。我們發送一條消息,會先格式化成「消息在 CommitLog 各字段示意圖」中的樣子,順序寫入 CommitLog 中,然後 Broker 會按照 」消息在 ConsumeQueue 各字段示意圖「所示構建一條索引記錄,存入該消息所屬 Topic 的 ConsumeQueue 索引文件中。如果有 IndexFile,還會構建 IndexFile。

現在我們已經知道了 RocketMQ 消息的存儲結構,接下來我們的就要了解 RocketMQ 是如何構建 CommitLog、ConsumeQueue 和 IndexFile,以及 RocketMQ 如何保證性能,支撐單機十萬級吞吐量的?這是本文的主要目標,一定要抓住主要目標,不要走丟咯。

啟動流程

了解了 RocketMQ 消息在磁盤中是怎麼存儲的,我們就可以來看看具體的存儲流程了。首先,還是先來看看 Broker 的啟動流程。初始化過程都是這個鳥樣,只看初始化過程完全不知所云,但是不看初始化過程,直接看具體執行流程也是摸不着頭腦,一堆組件不知道從哪裡來的,所以我們還是先耐着性子大致看看。但這並不是我們關注的重點,注意幾個關鍵點即可。
image.png

  • 初始化啟動環境。部署好 RocketMQ 後,執行/bin/mqbroker 腳本,主要用於設置 RocketMQ 目錄環境變量,例如 ROCKETMQ_HOME 。然後調用 ./bin/runbroker.sh 進入 RocketMQ 的啟動入口,主要設置了 JVM 啟動參數,比如 JAVA_HOME、Xms、Xmx。執行 main 函數。
  • 初始化 BrokerController。該初始化主要包含 RocketMQ 啟動命令行參數解析、NettyRemotingServer 初始化、Broker 各個模塊配置參數解析、Broker 各個模塊初始化、進程關機 Hook 初始化等過程。
  • 啟動 RocketMQ 的各個組件。但是這些組件並不是每一個都是核心組件,部分組件會在後面的流程中使用,這裡混個眼熟,如果後面流程沒有提及的大家可以暫且跳過,我們的目標是把握 RocketMQ 的核心內容,而不是每個細節。
    • MessageStore:存儲層服務,比如 CommitLog、ConsumeQueue 存儲管理,消息刷盤,索引構建等。
    • RemotingServer:普通通道請求處理服務。一般的請求都是在這裡被處理的。
    • FastRemotingServer:VIP 通道請求處理服務。如果普通通道比較忙,那麼可以使用 VIP 通道,一般作為客戶端降級使用。
    • BrokerOuterAPI:Broker 訪問對外接口的封裝對象。
    • PullRequestHoldService:Pull 長輪詢服務。
    • ClientHousekeepingService:清理心跳超時的生產者、消費者、過濾服務器。
    • FilterServerManager:過濾服務器管理。

存儲流程

image.png
在前面 RocketMQ 存儲結構中我們了解了 RocketMQ 將所有消息順序寫入 CommitLog,然後構建 ConsumeQueue/IndexFile 索引文件,所以這個小結我們主要的目標就是看看這些文件是如何構建的。

  • Broker 啟動流程中很關鍵的一點是啟動了 NettyRemotingServer,在 RocketMQ 通信機制(視頻) 中我們介紹過 Broker(NettyRemotingServer) 初始化會監聽端口等待客戶端連接,當客戶端發送請求的時,NettyRemotingServer WorkerGroup 處理可讀事件,調用 NettyServerHandler.channelRead0() 處理數據。
    接着調用鏈到 processRequestCommand 方法,這個方法主要是根據請求中的 RequestCode,從本地緩存 processorTable 中獲取相應的 Processor 來執行後續邏輯。處理器是什麼?處理器的緩存從哪裡來?

    Processor 就是用來處理特定請求的執行者,例如,生產者存入消息使用 SendMessageProcessor,查詢消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。在 Broker 啟動流程中有一步是註冊 Processor,以 RequestCode 為 Key ,Processor 為值,添加到 processorTable 緩存中。接着 RocketMQ 消息發送(視頻) 流程來看,當生產者的請求達到 Broker,Broker 獲取的 Processor 應為 SendMessageProcessor。封裝一個 Runable 對象,run 方法內調用 SendMessageProcessor.processRequest ,提交到線程池,繼續後面的處理。

  • SendMessageProcessor.processRequest 調用 sendMessage 方法,主要包含消息的校驗及重試邏輯處理,然後調用存儲模塊 DefaultMessageStore 存儲消息。
    消息校驗:校驗 Broker 是否配置可寫,校驗 Topic 名字是否為默認值,獲取或創建 topicConfig,判斷 queueId 是否超過限制。
    重試消息處理:消費者消費失敗後會將消息發回給 Broker,這裡我們暫且認為就是生產者發送的請求,先看下面的流程。

  • DefaultMessageStore.putMessage 只是做了很多的校驗,簡單看看即可。包括:如果當前 Broker 停止工作則拒絕消息寫入、Broker 為 SLAVE 角色則拒絕消息寫入、當前 RocketMQ 不支持寫入則拒絕消息寫入、主題長度超過 256 個字符則拒絕消息寫入、消息屬性長度超過 65536 個字符則拒絕消息寫入、PageCache 忙則報錯。然後調用 CommitLog.putMessage 存入消息。

  • 看到這裡應該稍微熟悉一些了,終於到我們期待已久的 CommitLog 出場了。主要是延遲消息處理,然後獲取可以寫入的 CommitLog 進行寫入。
    延遲消息處理:如果消息的延遲級別大於 0,將消息的原主題名稱與原消息隊列 ID 存入消息屬性中,用延遲消息主題 SCHEDULE_TOPIC、消息隊列 ID 更新原先消息的主題與隊列,這是並發消息消費重試關鍵的一步。但不是這個本節的主要目標,後文會進一步分析。
    關鍵點在如何獲取可以寫入的 CommitLog。存儲結構小節裏面有提到每個 CommitLog 默認大小 1G,寫完一個文件,以偏移量命名創建下一個文件。每個 1G 大小 CommitLog 的在代碼層面對應的是 MappedFile,而多個 MappedFiled 組成 MappedFileQueue。邏輯上的 CommitLog 通過持有 MappedFileQueue 管理多個 MappedFile。所以,獲取可以寫入的 CommitLog 也就是獲取 MappedFileQueue 最後一個 MappedFile,為什麼是最後一個,因為前面的已經寫完了呀。來看看 RocketMQ 邏輯與物理存儲的對應關係應該能夠更直觀的理解。image.png

  • 獲取到最後一個 MappedFile 後,調用 MappedFile.appendMessage 將消息追加到該文件中。可是儘管是順序寫入,但是連小學生都知道寫磁盤還是很慢,難道想這樣支撐 RocketMQ 單機十萬吞吐量?too young too simple!從邏輯存儲結構和物理存儲結構的映射關係來看,MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件讀寫的通道),通過 fileChannel 可以訪問物理 CommitLog 文件,但是 RocketMQ 並沒有直接使用 fileChannel,而是映射到一個 MappedByteBuffer,我們的目的就是把消息寫入這個 ByteBuffer 中,進而寫入 MappedFile 對應的 CommitLog 文件。為什麼需要這樣做,還有哪些細節,會在」文件內存映射「小結中為大家解答。

  • 繼續看流程,得到 MappedFile 對應的 ByteBuffer,我們需要將消息序列化,寫入 ByteBuffer 中。

    1. 構建消息 id, createMessageId
    2. 獲取該消息在消息隊列的偏移量,CommitLog 中保存了當前所有消息隊列的當前待寫入偏移量。
    3. 判斷是否是事務消息:這裡主要處理 Prepared 類型和 Rollback 類型的消息,設置消息 queueOffset 為 0
    4. 計算消息總大小,calMsgLength。
    5. 判斷文件的剩餘空間,是否足夠寫入當條消息,如果不可以,則將文件末尾寫入剩餘空間大小+固定魔數;然後返回一個 END_OF_FILE 的結果
    6. 如果空間足夠,這將這條消息寫入之前得到的 MappedFile 的 ByteBuffer 中。
    7. 將各字段按照」消息在 CommitLog 各字段示意圖「存入 Bytebuffer,然後返回 PUT_OK 結果

總結

以上就是今天 RocketMQ 消息存儲的主要內容,消息只是寫入到 CommitLog 對應的 ByteBuffer中,下一期就是我們重要的零拷貝即將登場。我們簡單總結一下今天的內容:

  • 要理解消息的存儲流程需要先知道消息的存儲結構:在物理上消息挨個順序寫入 CommitLog,為了提升消息查詢效率需要構建消息的索引文件 ConsumeQueue/IndexFile;
  • Broker 啟動時進行參數解析,並初始化了 NettyRemotingServer,啟動存儲服務用於消息存儲及索引構建等;
  • Broker 收到消息存儲請求,經過層層校驗,獲取 CommitLog 對應的 MappedFile,將消息寫入MappedFile 對應的內存映射ByteBuffer;

以上就是今天全部的內容,如果覺得本期的內容對你有用的話記得點贊、關注、轉發、收藏,這將是對我最大的支持。

如果你需要 RocketMQ 相關的所有資料,可以評論區留言,或者關注公眾號:三此君。回復:mq,即可。

消息已經寫入 ByteBuffer,寫入 ByteBuffer 就可以了嗎?那收到消息直接丟棄豈不是更好。消息要落在磁盤上才不會丟失,所以下一期我們要分享的就是消息的刷盤及索引構建,PageCache 及零拷貝也將閃亮登場。感謝觀看,下期不見不散。
image.png

參考文獻

  • RocketMQ 官方文檔
  • RocketMQ 源碼
  • 丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.
  • 李偉. RocketMQ分佈式消息中間件:核心原理與最佳實踐. 電子工業出版社, 2020-08.
  • 楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.