RocketMQ源碼詳解 | Broker篇 · 其三:CommitLog、索引、消費隊列
概述
上一章中,已經介紹了 Broker 的文件系統的各個層次與部分細節,本章將繼續了解在邏輯存儲層的三個文件 CommitLog、IndexFile、ConsumerQueue 的一些細節。文章最後,還會對比下 RocketMQ 和 Kafka 的持久化結構與設計的合理性。
CommitLog
現在,先從 CommitLog 的幾個指針開始複習

在上一章 《RocketMQ源碼詳解 | Broker篇 · 其二:文件系統》 中,我們已經了解了 CommitLog 的緩存和刷盤的策略,現在來簡單梳理一下。
上文介紹道, CommitLog 在開啟 transientStorePool
時,會有一塊 writeBuffer,這塊 ByteBuffer
是分配的一塊堆外內存,也就是圖上的灰色部分。我們在上圖中看見的 wrote 指針,指向了當前已寫入 writeBuffer 但是沒有 commit 的位置。
這塊灰色部分只存在 Java 進程中,也就是說程序崩了則會丟失。且如果關閉 transientStorePool
選擇,該指針將不會存在。
然後當我們定期將 writeBuffer 刷入 FileChannel
後,就變成了圖中的紅色塊。其中的 commited 指針代表在這之前的消息都刷入了 page cache。
這部分的消息由於存放在 page cache 中,且 page cache 是操作系統內核中的一塊內存,所以程序崩了不會丟失,但在宕機後依舊會丟失。
不過 CommitLog 會根據具體的刷盤策略來異步或同步的進行刷盤,也就是說,在 flushed 指針之前的數據,已經完全的磁盤裡了。
且這塊數據除非介質被破壞,否則一般不會丟失。
而在等待所有的 CommitLogDispatcher
處理完成後,reputed 指針就會前進。而這個 dispatch 做的事,就是我們之前在消息提交時沒有發現的兩件事:構建 IndexFile 和 ConsumerQueue。
需要注意的是,圖中雖然畫為 reputed 指針在 flushed 指針後面。但實際上 reputed 指針最快可以和 wrote 指針同步,
CommitLogDispatcher 實現類有:
CommitLogDispatcherBuildConsumeQueue
CommitLogDispatcherBuildIndex
CommitLogDispatcherCalcBitMap
這些類會在後文進行介紹
由上章我們知道,CommitLog 的文件結構如下:

它的長度默認是固定為 1G,文件名為開頭的 offset,其中消息是不定長的,在尾部發現新的消息寫不下的時候,會新開文件。且在舊的文件寫入 當前文件的總長和魔數。
IndexFile
RocketMQ 通過建立 IndexFile 以提供一種能夠通過 時間範圍 或 Key 值 來查詢 Message 的方法。
IndexFile 的文件結構如下:

IndexFile 可以分為三部分
-
Header
頭部記錄了記錄消息的開始(最小)時間,結束(最大)時間,開始(最小)偏移量,結束(最大)偏移量,和槽的個數與節點個數
-
Slot Table
table 的槽記錄了指向當前槽中尾節點的指針
-
Index Linked List
記錄了所有節點的索引信息
由結構可以看出,IndexFile 是標準的 hash 索引,如果了解過 hash 索引的話,根據上文馬上就能猜到到 IndexFile 的運行機制了。
接下來進入源碼部分
CommitLogDispatcher
ReputMessageService
是在 DefaultMessageStore
類下啟動的一個服務,也就是存儲組件層。這個服務會每隔一秒就執行一次 deReput
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
// 重放隊列落後太多導致 未重放的 commitLog 過期
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
/*
* 當系統重啟的時候,會根據 duplicationEnable 來決定是否從頭開始處理
* 消息還是只處理新來的消息。在其打開的情況下,還需要設置 CommitLog.confirmOffset
* 才能從頭開始處理消息,因為默認情況下系統啟動以後 CommitLog.confirmOffset
* 和ReputMessageService.reputFromOffset是相等的
*/
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 切片以當前 reputed 指針為起點的 ByteBuffer,長度為到當前 wrote 指針
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 構建分發用的請求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 進行分發,將請求分發到所有的處理器上
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 對長輪詢的處理
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// 完成,可以將重放指針的偏移量向前推進
this.reputFromOffset += size;
readSize += size;
// 更新度量信息
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
// 當前文件已讀完,跳到下一個文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
/* pass */
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
構建了分發數據後,就交給了每一個 CommitLogDispatcher
處理,而在 Index 中,則是調用了 IndexService
的 buildIndex
方法。
我們主要關心的是 hash 索引的各種操作,所以接下來先看 put 方法
IndexFile#putKey
首先得到 Slot Table 中,Key 所在的槽號
int keyHash = indexKeyHashMethod(key);
// 得到哈希槽號
int slotPos = keyHash % this.hashSlotNum;
// 得到槽的物理偏移量
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
然後獲取目標槽的頭節點的地址,和當前時間與在 Header 中的 beginTime 的差值
// 獲取桶的頭節點
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
在 Index Linked List 中的空桶上添加節點
// 添加索引節點
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
可以看到這裡在時間戳上存放的是與開始時間的偏移值,是一個很好的節省空間的方法。然後將原來的頭節點的地址存儲
最後更新元信息
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
知道了 put 的原理以後,那 get 也不在話下了
IndexFile#selectPhyOffset
首先還是獲取 Slot Table 中 Key 的頭節點的位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
然後在 Index Linked List 中遍歷這條鏈表,直到尾節點
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
不過和一般的 Hash 表不同的是,由於時間是有序的,所以在發現遍歷到目標節點開始時間的前面的時候,就不會繼續遍歷了。
且由於相同 Key 是不會覆蓋的,所以會把所有和 key 的 hash 相同的 CommitLog 偏移量返回。
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
至於如何在多個 IndexFile 中查找的方法也很簡單,只需要在 Header 中根據時間戳來判斷是否需要訪問即可。
ConsumerQueue

ConsumerQueue 的文件結構較為簡單,其由 30W 個的上圖中的結構體組成。
通過 CommitLogDispatcherBuildConsumeQueue
分發的消息會在找到對應 Queue 後直接在 MappedFile 文件追加寫入這個結構。
而在查找過程也比較簡單,這裡的查找分為按消息邏輯偏移量查找和按時間戳查找。
對於按消息邏輯偏移量查找,可以通過計算下標來進行隨機讀取。
而按時間戳查找則是一個比較有趣的部分,它使用了二分法來加速查找:
public long getOffsetInQueueByTime(final long timestamp) {
// 通過時間戳獲取剛好在這之前的 MappedFile
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
// 低位為 消息隊列最小偏移量 與 該文件最小偏移量 中的最小值
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
try {
while (high >= low) {
// ? 奇怪的寫法,先除以 CQ_STORE_UNIT_SIZE 再乘以 CQ_STORE_UNIT_SIZE
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
// 獲取找到桶在 CommitLog 中的偏移量
long phyOffset = byteBuffer.getLong();
// 獲取該消息大小
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
// 根據持久化時間進行二分查找
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) >
Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
但是我們發現,消息需要消費的時候,只靠 ComsumerQueue 是不夠的,因為在這個結構中並沒有記錄每一個消費者組的消費進度。
這是因為 Broker 端是將消費進度維護在內存的一個 Map 中,同時會定時的將該 Map 轉為 json 格式持久化到磁盤。
Kafka 與 RocketMQ 的對比
最後來對比下 Kafka 和 RocketMQ 的持久化方式。
在 Kafka 中,文件類型主要有:
-
log 文件
消息的存儲文件
-
index 文件
位置索引。通過邏輯偏移量尋找到在 log 文件中的物理偏移量
-
timeindex 文件
時間戳索引。可以通過時間戳尋找到在 log 文件中的物理偏移量
從索引上,Kafka 和 RocketMQ 都可以根據位置和時間戳來尋找消息。
但是在存儲方法上,Kafka 是直接將每一個 Topic 的分區在物理上通過不同的文件來進行管理,而 RocketMQ 則選擇了邏輯的將 Topic 和 Queue 進行劃分,寫入的位置則是一個單獨的文件。
直覺上看,Kafka 的方案由於在物理上進行了劃分,而 RocketMQ 還需要維護 Consumer 文件,而兩者都是順序寫入,那毫無疑問前者更能減少額外維護的工作。
但實際上,RocketMQ 的底層設計方式是優於 Kafka 的。以下為兩者在多個 Topic 的情況下的 TPS 的測量
產品 | Topic數量 | 發送端並發數 | 發送端RT | 發送端TPS | 消費端TPS |
---|---|---|---|---|---|
RocketMQ | 64 | 800 | 8 | 9w | 8.6w |
128 | 800 | 9 | 7.8w | 7.7w | |
256 | 800 | 10 | 7.5w | 7.5w | |
Kafka | 64 | 800 | 5 | 13.6w | 13.6w |
128 | 256 | 23 | 8500 | 8500 | |
256 | 256 | 133 | 2215 | 2352 |
可以看出,在 Topic 較少的情況下,Kafka 是可以擊敗 RocketMQ 的,但一旦 Topic 增加,Kafka 的 TPS 將會斷崖式的下降。
原因在於, Kafka 的內存里的順序寫在多 Topic 多 Queue 下被轉化為實際上的隨機寫。
我們都知道,RocketMQ 和 Kafka 都使用了 Page Cache 來加速文件的訪問,同時如果在生產後立刻消費的話,消息都是在 Page Cache 中就被發送到網卡緩衝區中(這被稱為零拷貝)。但是,內存是有限的, Page Cache 的大小也是有限的,但內存中的頁過多,便會觸發”換出”。
Topic 和分區多的情況下,打開的文件句柄也變多,被 mmap 映射到內存中的文件也會變多,因此在寫入時,多個文件的頁輪流被”換入”和”換出”,當然就比不過直接順序寫入到內存中的速度了。
而對於讀取,兩者都需要將所需要的頁換入到內存中,故都是隨機讀,區別不大。