­

Kafka與RocketMq文件存儲機制對比

一個商業化消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。

開頭問題

kafka文件結構和rocketMQ文件結構是什麼樣子?特點是什麼?

一、目錄結構

Kafka

Kafka以partition為單元分片存儲消息

Kafka部分名詞解釋如下:

  • Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
  • Topic:一類消息
  • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
  • Segment:partition物理上由多個segment組成
  • offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序列號叫做offset,用於partition唯一標識一條消息.

partition(分片目錄)

為方便理解以單broker為例,假設建立一個broker建立的topic是kafka-topic-01,partition數量是3, 會形成以下目錄

#1、分區目錄文件
drwxr-x--- 2 root root 4096 Jul 26 19:35 kafka-topic-01-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-2
  • 名稱topic+有序序號
  • 一個partition(目錄)中的數據被切分為多個大小相等的segment(段)數據文件中
  • partition中消息只能順序寫讀

segment(分段消息)

分為三個文件

  • 索引文件.index
  • 日誌文件.log
  • 時間戳索引文件.timeindex
#2、分區目錄中的日誌數據文件和日誌索引文件
-rw-r----- 1 root root 512K Jul 24 19:51 00000000000000000000.index
-rw-r----- 1 root root 1.0G Jul 24 19:51 00000000000000000000.log
-rw-r----- 1 root root 768K Jul 24 19:51 00000000000000000000.timeindex
-rw-r----- 1 root root 512K Jul 24 20:03 00000000000022372103.index
-rw-r----- 1 root root 1.0G Jul 24 20:03 00000000000022372103.log
-rw-r----- 1 root root 768K Jul 24 20:03 00000000000022372103.timeindex
-rw-r----- 1 root root 512K Jul 24 20:15 00000000000044744987.index
-rw-r----- 1 root root 1.0G Jul 24 20:15 00000000000044744987.log
-rw-r----- 1 root root 767K Jul 24 20:15 00000000000044744987.timeindex
-rw-r----- 1 root root  10M Jul 24 20:21 00000000000067117761.index
-rw-r----- 1 root root 511M Jul 24 20:21 00000000000067117761.log
-rw-r----- 1 root root  10M Jul 24 20:21 00000000000067117761.timeindex
  • segment文件以偏移量命名,數值最大64位long類型

segment內部-index文件

  • 索引文件採用稀疏索引(即有的消息不能找到對應的索引),目的是節省存儲空間
  • 定長,佔8個位元組

消息單元的存儲結構

欄位名

說明

relativeOffset(4)

相對偏移量,相對baseOffset來說

position(4)

物理地址,日誌文件中的物理地址

如何查找消息

如offset的值是368772

1.根據offset找到所在的segment,根據二分查找,找到消息所在的log文件0000000000000368769.log和索引文件0000000000000368769.index

2.計算下差368772-368769=3,在索引文件中也是二分查找,定位到是<3,497>記錄,即對應的物理位置是497,從而找到消息

3.根據物理位置497在0000000000000368769.log文件找到消息。

segment內部-timeIndex文件

根據指定的時間戳查找偏移量資訊

  • 文件名:以時間戳命名
  • 定長,12個位元組
  • 時間戳只能遞增,追加的時間戳小於之前的時間戳,不予添加

欄位名

說明

timestamp(8)

當前日誌分段最大時間戳

relativeOffset(4)

時間戳對應的相對偏移量

segment內部-log文件

RocketMQ

rocketMQ把所有topic中的消息都commitLog中

存儲的文件主要分為:

  • commitlog: 存儲消息實體
  • consumequeue: 按Topic和隊列存儲消息的offset
  • index: index按key、tag、時間等存儲

commitlog(物理隊列)

文件地址:${user.home} \store\${commitlog}${fileName}

  • 存放該broke所有topic的消息
  • 默認1G大小
  • 以偏移量為文件名,當一個文件寫滿時則創建新文件,這樣的設計主要是方便根據消息的物理偏移量,快速定位到消息所在的物理文件
  • 一個消息存儲單元是不定長的
  • 順序寫但是隨機讀

consumeQueue(消費隊列)

文件地址:${storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}

  • 文件名:跟commitlog一樣以偏移量作為文件名
  • 按topic和queueId緯度分別存儲消息commitLogOffset、size、tagHashCode
  • 一個存儲單元是20個位元組的定長的
  • 順序讀順序寫

消息單元的存儲結構

欄位名

說明

offset(8)

commitlog的偏移量

size(4)

commitlog消息大小

tagHashCode

tag的哈希值

indexFile(索引文件)

文件地址:${user.home}\store\index\${fileName}

  • 以時間作為文件名
  • 一個存儲單元是20個位元組定長的
  • 一個indexFile最多存儲2000w條消息

索引文件(Index)提供消息檢索的能力,主要在問題排查和數據統計等場景應用

如何查找消息

  1. 消費者順序讀取consumerQueue,獲取到物理offset,根據物理offset去commitlog文件中隨機讀取消息實體

二、如何保存消息消費進度

Kafka

方式一:zookeeper存儲

0.9之前老版本

消費者如果是根據javaapi來消費,也就是【kafka.javaapi.consumer.ConsumerConnector,通過配置參數【zookeeper.connect】來消費。這種情況下,消費者的offset會更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目錄下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

保存方式:

consumer在從broker讀取消息後,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的消息的offset,該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同

方式二:broker存儲

broker 存放 offset 是 kafka 從 0.9 版本開始

存儲位置:

consumer 默認將 offset 持久化保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。

提交offset分為:自動提交和手動提交

保存方式:

消費者正常運行,除了持久化一份消費offset到broker中,還會在記憶體中保存一份消費進度offset,所以當消費者都正常運行時__consumer_offsets使用的比較少。當消費者崩潰或者balance時,會從broker中拉取最後一次消費offset。

RocketMQ

方式一:集群模式

集群模式:topic中的一條消息只會同一個消費者組中的一個消費者消費,不會被多個消費者消費

對offset的管理分為本地模式和遠程模式。本地模式是以文本文件的形式存儲在客戶端,而遠程模式是將數據保存到broker端,對應的數據結構分別為LocalFileOffsetStore和RemoteBrokerOffsetStore。

集群模式使用的是遠程模式。

存儲位置:

ocketMQ的broker端中,offset的是以json的形式持久化到磁碟文件中,文件路徑為${user.home}/store/config/consumerOffset.json

{
    "offsetTable": {
        "topic-name@consumer-group": {
            "0": 88526, 
            "1": 88528
        }
    }
}

保存方式:

定時持久化到broker磁碟ConsumerOffset.json

consumer從broker拉取消息後,Broker更新消費進度,僅僅是更新了記憶體中的offsetTable表,並沒有涉及到ConsumerOffset.json這個文件。broker啟動時會啟動一個定時任務(默認5秒),來定時把消費offset持久化到磁碟consumerOffset.json,保存的過程是先將原來的文件存到ConsumerOffset.json.bak文件中,然後將新的內容存入ConsumerOffset.json文件

方式二:廣播模式

廣播模式:一條消息會被每個消費者消費

當消費模式為廣播模式時,offset使用本地模式存儲,因為每條消息會被所有的消費者消費,每個消費者管理自己的消費進度,各個消費者之間不存在消費進度的交集。

三、特點

Kafka

為什麼要設計成partition中多segment

  • 一個就是上面提到的如果使用單個 Partition 來管理數據,順序往 Partition 中累加寫勢必會造成單個 Partition 文件過大,讀消息是順序讀的(調用FileMessageSet的searchFor方法),文件過大,查詢效率下降
  • 另一個原因是 Kafka 消息記錄不是一直堆堆堆,默認是有日誌清除策略的。要麼是日誌超過設定的保存時間觸發清理邏輯,要麼就是 Topic 日誌文件超過閾值觸發清除邏輯,如果是一個大文件刪除是要鎖文件的這時候寫操作就不能進行。因此設置分段存儲對於清除策略來說也會變得更加簡單,只需刪除較早的日誌塊即可

清理數據功能-日誌清理

  1. 基於時間

日誌刪除任務會定時(默認5分鐘執行一次)檢查是否有保留時間超過設定閾值(默認保存7天)可刪除的segment文件。

  1. 基於日誌大小

日誌刪除任務會檢查當前日誌的大小是否超過設定的閾值retentionSize來尋找可刪除的日誌分段的文件集合deletableSegments,參考下圖所示

基於日誌大小的保留策略與基於時間的保留策略類似,其首先計算日誌文件的總大小size和retentionSize的差值diff,即計算需要刪除的日誌總大小,然後從日誌文件中的第一個日誌分段開始進行查找可刪除的日誌分段的文件集合deletableSegments。查找出deletableSegments之後就執行刪除操作

基於日誌起始偏移量

該刪除策略具體是刪除某日誌分段的下一個日誌分段的baseOffset小於等於logStartOffset的部分。

壓縮數據

Producer 端壓縮、Broker 端保持、Consumer 端解壓縮

在Kafka中,壓縮可能發生在兩個地方:生產者端和Broker端。broker端保存的也是壓縮的消息,傳輸到consumer端再進行解壓縮

在吞吐量方面:LZ4 > Snappy > zstd / GZIP

RocketMQ

RocketMQ的CommitLog文件採用混合型存儲

即所有的Topic下的消息隊列共用同一個CommitLog的日誌數據文件。感覺這樣會增加隨機讀的概率,可以學著kakfa按topic隔離。

預載入MappedFile文件

消息寫入時,每次都回去去mappedFileQueue中去拿mappedfile。而這個mappedfile是由後台運行的AllocateMappedFileService服務執行緒去創建和預分配的。這樣下次獲取時候直接返回就可以不用等待MappedFile創建分配所產生的時間延遲

文件預熱

我們拿到mmapedfile文件,可能pagecache中還是出現頁數據不存在的情況,所以rocketmq增加了預熱

有一個warmMappedFile方法,它會把當前映射的文件,每一頁遍歷多去,寫入一個 0 位元組,然後再調用mlockmadvise(MADV_WILLNEED)

mlock:可以將進程使用的部分或者全部的地址空間鎖定在物理記憶體中,防止其被交換到 swap 空間。

madvise:給作業系統建議,說這文件在不久的將來要訪問的,因此,提前讀幾頁可能是個好主意

四、讀寫方式

通過哪些I/O機制來訪問index和segment文件呢?可以分為寫和讀兩塊:

Kafka

寫(生產)消息:

  • index文件較小,可以直接用mmap進行記憶體映射
  • segment文件較大,可以採用普通的write(FileChannel.write),由於是順序寫PageCache,可以達到很高的性能

讀(消費)消息:

  • index文件仍然通過mmap讀,缺頁中斷的可能性較小
  • segment可以使用sendfile進行零拷貝的發送給消費者,達到非常高的性能

RocketMQ

寫(生產)消息:

  • CommitLog、ConsumerQueue都使用MMAP進行寫

讀(消費)消息:

  • commitLog和consumerQueue文件都是MMAP讀

五、存儲關鍵技術—Mmap、PageCache、sendfile

Mmap

普通讀文件過程

大體流程如下:

  1. 進程使用系統調用向內核發起文件讀取請求,此時會有用戶態轉為內核態的過程。
  2. 內核訪問文件系統。
  1. 如果有 cache 直接返回數據,沒有開始讀取磁碟
  2. 讀取成功將 page1 讀取到 cache 中完成第一次 copy
  1. 通知內核讀取完畢(不同IO模型實現不同)
  2. 將數據從位於內核空間的 cache 拷貝到進程空間,完成第二次拷貝。

這裡簡單說一下為啥要拷貝到進程中:進程之間是相互隔離的,而且在常規操作下進程無法訪問內核數據,所以得將 cache 拷貝到進程當中,給進程使用。

  • 耗時主要集中在內核切換、copy時長

Mmap映射

沒有數據拷貝,映射的是數據地址

mmap 把文件映射到用戶空間里的虛擬記憶體,省去了從內核緩衝區複製到用戶空間的過程,文件中的位置在虛擬記憶體中有了對應的地址,可以像操作記憶體一樣操作這個文件,相當於已經把整個文件放入記憶體。mmap 在完成了 read、write 相同效果的同時不僅省去了內核到進程的記憶體拷貝過程,而且還可以實現數據的共享操作:一個文件可以同時被多個進程、內核映射,如果映射的文件被內核或其他進程修改,那麼最終的結果也會反映到映射當中。

  • 映射的虛擬記憶體可以被多個進程讀寫
  • 少了每次內核態pageCache到進程私有記憶體的拷貝
  • 可以簡單看成直接操作pageCache

Mmap的限制

  • 映射文件的大小有限制,一般最大1.5G~2G
  • MMAP 使用的是虛擬記憶體,和 PageCache 一樣是由作業系統來控制刷盤的,雖然可以通過 force() 來手動控制,小記憶體場景不適合。

OS的PageCache機制

PageCache是OS對文件的快取,用於加速對文件的讀寫。一般來說,程式對文件進行順序讀寫的速度幾乎接近於記憶體的讀寫訪問,這裡的主要原因就是在於OS使用PageCache機制對讀寫訪問操作進行了性能優化,將一部分的記憶體用作PageCache

一、文件讀取

如果一次讀取文件時出現未命中(cache miss)PageCache的情況,OS從物理磁碟上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取(ps:順序讀入緊隨其後的少數幾個頁面)。這樣,只要下次訪問的文件已經被載入至PageCache時,讀取操作的速度基本等於訪問記憶體

二、文件寫入

OS會先寫入至Cache內,隨後通過非同步的方式由pdflush內核執行緒將Cache內的數據刷盤至物理磁碟上對於文件的順序讀寫操作來說,讀和寫的區域都在OS的PageCache內,此時讀寫性能接近於記憶體。不是順序寫,當pageCache中發現漏頁,還是會去吧磁碟中數據拉到pageCache再寫

sendfile

FileChannel#tranferTo transferFrom實現零拷貝

kafka消費的時候使用了零拷貝的sendfile。pagecache數據不經過內核切換直接拷貝到socket buffer傳統的數據發送需要發送4次上下文切換,採用sendfile系統調用之後,數據直接在內核態交換,系統上下文切換減少 為2次。根據測試結果,可以提高60%的數據發送性能。

六、參考

//t1mek1ller.github.io/2019/11/13/kafka-rocketmq-storage/

//tech.meituan.com/2015/01/13/kafka-fs-design-theory.html

swap 空間: swap space是磁碟上的一塊區域,可以是一個分區,也可以是一個文件,或者是他們的組合。當RAM滿了後,並且需要更多記憶體空間時,使用磁碟空間代替RAM空間

Tags: