【RocketMQ源碼分析】深入消息存儲(2)
- 2021 年 4 月 9 日
- 筆記
前文回顧
CommitLog篇 ——【RocketMQ源碼分析】深入消息存儲(1)
MappedFile篇 ——【RocketMQ源碼分析】深入消息存儲(3)
前文說完了一條消息如何被持久化到本地磁碟CommitLog,本篇就要談談如何從CommitLog來構建我們消息消費的核心隊列
結構ConsumeQueue
了。
之前已經說過,CommitLog文件是消息的大雜燴,所有消息具體都被放到了這個大文件中,而ConsumeQueue則是一個邏輯上的隊列,也是消息消費的核心,它存在Topic與Queue這兩個概念,也就是消費者在消費時需要關心的東西。
除了ConsumeQueue目錄下存在Topic與QueueId組成的兩級目錄,它實際存儲消息的文件也是與CommitLog相似,是一個文件名代表offset的文件。
在ConsumeQueue中,每一個單元結構如下圖:
CommitLog Offset : 8 Byte
Size : 4 Byte
Message Tag Hashcode : 8 Byte
第一個8Byte的CommitLog Offset代表該消息在CommitLog文件中的偏移位置。
第二個4Byte的Size代表消息的大小。
憑藉CommitLog Offset與Size就可以在CommitLog中定位一條消息。
而第三個欄位Message Tag Hashcode則是用來快速過濾消息。
可以發現,ConsumeQueue中每個消息佔據20Byte,也就是說如果MessageStoreConfig的mappedFilesizeConsumeQueue設置為100Byte,那麼每個ConsumeQueue文件只能存儲5條消息。
了解完ConsumeQueue,接下來就需要知道ConsumeQueue是在什麼時候構建的,在上一篇CommitLog存儲消息時,我們沒有看到有寫入ConsumeQueue,那是因為ConsumeQueue是由一個非同步執行緒去構建的。
非同步構建流程如下,DefaultMessageStore的內部類ReputMessageService繼承自ServiceThread,其run方法中每1毫秒調用一次doReput方法。
reputFromOffset是初始開始同步的偏移位置,默認為0。
在doReput方法中進行了ConsumeQueue的構建。
首先,是一個只有在異常情況下才會終止的for循環,doNext只有返回值錯誤的情況下才會設為false,可以理解為一個死循環。
如果當前reput的偏移量大於等於已確認的,就不需要構建。
如果符合條件,就從CommitLog中讀取對應位置的數據,如果返回null,就終止當前for死循環,不為null則去構建ConsumeQueue。
可以看看result返回了什麼。
如果result不為null,獲取起始偏移位置,並且設置為doReput的reputFromOffset偏移量。
之後進入checkMessageAndReturnSize方法,該方法就是從result變數的buffer中讀取消息的具體數據了,一堆buffer.get操作,此處不列出了,感興趣可以看CommitLog#checkMessageAndReturnSize方法。
方法最後包裝了一個DispatchRequest對象返回。
可以看看此處Debug拿到的DispatchRequest,主要有Topic名稱,在CommitLog中的偏移位置,以及消息大小,如果你還記得上一篇CommitLog在最後判斷CommitLog文件能不能存下這條消息時,可以看到當時就是一個121位元組加8位元組魔術大小的消息,在此處成功讀取到了。
讀取成功之後,就要構建ConsumeQueue了。
進入doDispatch方法,可以看到要構建的目標是有一個list,是CommitLogDispatcher的子類做了實現。
可以看到下圖,實現有ConsumeQueue和Index兩個,也就是說ConsumeQueue和Index文件的非同步構建都是在此處,兩個類都位於DefaultMessageStore中。
ConsumeQueue是邏輯消息隊列,Index則是消息索引文件,存儲了消息的存儲時間,哈希,偏移量等資訊,本篇主要看ConsumeQueue,所以走到CommitLogDispatcherBuildConsumeQueue的實現即可。
首先獲取了消息類型,進入putMessagePositionInfo方法。
putMessagePositionInfo中分為兩步,首先拿到ConsumeQueue對象,然後放入請求。
第一步的findConsumeQueue,可以根據topic和queueId來定位一個ConsumeQueue,正如我們之前看到了目錄結構,這個獲取的流程是先查找map,這個沒有就新建。
步驟如下圖,首先從table中拿到ConsumeQueue,如果沒有拿到,就新建一個key是指定topic的結構,添加進table。
consumeQueueTable結構如下,是一個ConcurrentMap<String, ConcurrentMap<Integer,ConsumeQueue>>
。
拿到consumeQueueTable內置的map之後,就可以根據QueueId拿到目標ConsumeQueue了。
如果有,直接返回,如果沒有,就說明需要初始化該ConsumeQueue了,初始化完成之後放入consumeQueueTable,並且返回該ConsumeQueue。
拿到ConsumeQueue就可以對請求進行構建了,進入putMessagePositionInfoWrapper方法。
首先獲取了當前Store狀態是否可寫,如果可寫,會在for循環內嘗試maxRetries次構建,該值是30次。
在for循環內,首先拿到了tag,然後在isExtWriteEnable處判斷配置是否啟用了寫入擴展資訊,如果開啟,會在一個ConsumeQueueExt的結構中寫入存儲時間、Tag、bitMap等資訊,此處先跳過。
此時調用了putMessagePositionInfo方法寫入,如果返回result成功的話,直接return,否則休眠一秒鐘,然後繼續for循環,根據maxRetries的值,可以有30次機會。
進入putMessagePositionInfo方法,可以看到向ConsumeQueue的byteBufferIndex中put了數據,該buffer是何時初始化的呢?
ConsumeQueue的構造函數中,就對byteBufferIndex進行了初始化
public static final int CQ_STORE_UNIT_SIZE = 20;
可以看到buffer的大小只有20位元組,符合我們之前說的每個消息在ConsumeQueue中的大小。
而putMessagePositionInfo中首先也對byteBufferIndex進行了flip操作,也就是讀寫模式的切換,此處切換到了寫模式,並且將目標數據放入buffer中。
之後就是熟悉的MappedFile操作,可以參考上一篇CommitLog中講述的MappedFile。
最後appendMessage將消息寫入文件channel中,等待刷盤時持久化。
以上就是ConsumeQueue的構建流程了。