🏆【Alibaba中間件技術系列】「RocketMQ技術專題」系統服務底層原理以及高性能存儲設計分析

設計背景

消息中間件的本身定義來考慮,應該盡量減少對於外部第三方中間件的依賴。一般來說依賴的外部系統越多,也會使得本身的設計越複雜,採用文件系統作為消息存儲的方式。

RocketMQ存儲機制

消息中間件的存儲一般都是利用磁碟,一般是使用機械硬碟,但機械硬碟的速度比訪問記憶體慢了n個數量級,一款優秀的消息中間件必然會將硬體資源壓榨到極致,接下來看看rocketMq是如何做到高效存儲的。

RocketMQ存儲模型

CommitLog

消息主體以及元數據的存儲媒介,存儲Producer端寫入的消息主體內容。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩餘為起始偏移量,比如,00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;

ConsumeQueue

  • 消息消費的邏輯隊列,其中包含了這個MessageQueue在CommitLog中的起始物理位置偏移量offset,消息實體內容的大小和Message Tag的哈希值。

  • 實際物理存儲來說,ConsumeQueue對應每個Topic和QueueId下面的文件,單個文件大小約5.72M,每個文件由30W條數據組成,每個文件默認大小為600萬個位元組,當一個ConsumeQueue類型的文件寫滿了,則寫入下一個文件;

IndexFile

生成的索引文件提供訪問服務,通過消息Key值查詢消息真正的實體內容。在實際的物理存儲上,文件名則是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存2000W個索引;

MapedFileQueue

對連續物理存儲的抽象封裝類,可以通過消息存儲的物理偏移量位置快速定位該offset所在MappedFile(具體物理存儲位置的抽象)、創建、刪除MappedFile等操作;

MappedFile

文件存儲的直接記憶體映射業務抽象封裝類,通過操作該類,可以把消息位元組寫入PageCache快取區(commit),或者原子性地將消息持久化的刷盤(flush);

RocketMQ消息架構

集群有一個Broker,Topic為binlog的隊列(Consume Queue)數量為4,如下圖所示,按順序發送消息。Consumer端執行緒採用的是負載訂閱的方式進行消費。

Commit Log和Consume Queue

消息發送流程架構

發送到相關服務節點

生產到消費的轉換

總體核心流程

RocketMQ的消息整體是有序的,所以消息按順序將內容持久化在Commit Log中。Consume Queue則是用於將消息均衡地按序排列在不同的邏輯隊列,集群模式下多個消費者就可以並行消費Consume Queue的消息。

  1. MappedFile 所有的topic數據都寫到同一個文件中,文件的大小默認為1G,使用mmap與磁碟文件做映射,初始化時使用mlock將記憶體鎖定,防止pagecache被os交換到swap區域。數據是順序寫,數據寫滿後自動創建下個MappedFile順序寫入。
  2. MappedFileQueue MappedFile的隊列,存儲封裝了所有的MappedFile實例。
  3. CommitLog 封裝了寫入消息和讀取消息的實現,根據MappedFileQueue找到正在寫的MappedFile,之後將消息寫入到pagecache,最後同步到硬碟上。
  4. ConsumerQueue 一個topic可以設置多個queue,每個consumerQueue對應一個topic下的queue,相當於kafka里的partition概念。裡面存儲了msg在commitLog中的offset、size、tagsCode,固定長度是20位元組,consumer可以根據消息的offset在commitLog找到具體的消息。

詳細分析MQ發送和消費流程

消息生產和消費通過CommitLog

生產者發送消息最終寫入的是CommitLog(消息存儲的日誌數據文件),Consumer端先從ConsumeQueue(消息邏輯隊列)讀取持久化消息的起始物理位置偏移量offset、大小size和消息Tag的HashCode值,隨後再從CommitLog中進行讀取待拉取消費消息的真正實體內容部分;

IndexFile(索引文件)

為了消息查詢提供了一種通過key或時間區間來查詢消息的方法, 通過IndexFile來查找消息的方法不影響發送與消費消息的主流程。

RocketMQ的CommitLog文件採用混合型存儲

所有Topic下的消息隊列共用同一個CommitLog的日誌數據文件,並通過建立類似索引文件—ConsumeQueue的方式來區分不同Topic下面的不同MessageQueue的消息,同時為消費消息起到一定的緩衝作用。

  • 只有ReputMessageService非同步服務執行緒通過doDispatch非同步生成了ConsumeQueue隊列的元素後,Consumer端才能進行消費。

  • 只要消息寫入並刷盤至CommitLog文件後,消息就不會丟失,即使ConsumeQueue中的數據丟失,也可以通過CommitLog來恢復。

RocketMQ順序讀寫

  • 發送消息時,生產者端的消息確實是順序寫入CommitLog;

  • 消費消息時,消費者端也是順序讀取ConsumeQueue,根據其中的起始物理位置偏移量offset讀取消息是隨機讀取CommitLog。

在RocketMQ集群整體的吞吐量、並發量非常高的情況下,隨機讀取文件帶來的性能開銷影響還是比較大的

RocketMQ存儲架構的優缺點:

優點:
  1. ConsumeQueue消息邏輯隊列較為輕量級;
  2. 磁碟的訪問串列化,避免磁碟竟爭,不會因為隊列增加導致IOWAIT增高;
缺點:
  1. CommitLog來說寫入消息雖然是順序寫,但是讀卻變成了完全的隨機讀;
  2. Consumer端訂閱消費一條消息,需要先讀ConsumeQueue,再讀Commit Log,一定程度上增加了開銷;

RocketMQ存儲模型

RocketMQ文件存儲模型,根據類別和作用從概念模型上大致可以劃分為5層

  1. RocketMQ業務處理器層: Broker端對消息進行讀取和寫入的業務邏輯入口,這一層主要包含了業務邏輯相關處理操作(根據解析RemotingCommand中的RequestCode來區分具體的業務操作類型,進而執行不同的業務處理流程),比如前置的檢查和校驗步驟、構造MessageExtBrokerInner對象、decode反序列化、構造Response返回對象等;

  2. RocketMQ數據存儲組件層:該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數據文件的訪問入口,通過該類的「putMessage()」和「getMessage()」方法完成對CommitLog消息存儲的日誌數據文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);在該組件初始化時候,還會啟動很多存儲相關的後台服務執行緒:

    • AllocateMappedFileService(MappedFile預分配服務執行緒)
    • ReputMessageService(回放存儲消息服務執行緒)
    • HAService(Broker主從同步高可用服務執行緒)
    • StoreStatsService(消息存儲統計服務執行緒)
    • IndexService(索引文件服務執行緒)等;
  3. RocketMQ存儲邏輯對象層: 該層主要包含了RocketMQ數據文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。

    • IndexFile為索引數據文件提供訪問服務
    • ConsumerQueue為邏輯消息隊列提供訪問服務
    • CommitLog則為消息存儲的日誌數據文件提供訪問服務。

這三個模型類也是構成了RocketMQ存儲層的整體結構;

  1. 封裝的文件記憶體映射層: RocketMQ主要採用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數據文件的讀寫。

    • 採用MappedByteBuffer這種記憶體映射磁碟文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。
    • 對於每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的位元組大小數+1,即為文件的起始偏移量,從而實現了整個大文件的串聯。

每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、記憶體數據刷盤、記憶體清理等和文件相關的服務);

  1. 磁碟存儲層: 主要指的是部署RocketMQ伺服器所用的磁碟。

RocketMQ存儲技術

主要採用mmap與PageCache,其中mmap記憶體映射技術—Java中的MappedByteBuffer。

先簡單介紹下mmap

mmap一種記憶體映射文件的方法,即將一個文件或者其它對象映射到進程的地址空間,實現文件磁碟地址和進程虛擬地址空間中一段虛擬地址的一一對映關係。實現這樣的映射關係後,進程就可以採用指針的方式讀寫操作這一段記憶體,而系統會自動回寫臟頁面到對應的文件磁碟上。內核空間對這段區域的修改也直接反映用戶空間,從而可以實現不同進程間的文件共享。

mmap記憶體映射和普通標準IO操作的本質區別在於它並不需要將文件中的數據先拷貝至OS的內核IO緩衝區,而是可以直接將用戶進程私有地址空間中的一塊區域與文件對象建立映射關係,這樣程式就好像可以直接從記憶體中完成對文件讀/寫操作一樣。

只有當缺頁中斷髮生時,直接將文件從磁碟拷貝至用戶態的進程空間內,只進行了一次數據拷貝。對於容量較大的文件來說(文件大小一般需要限制在1.5~2G以下,這也是CommitLog設置成1G的原因),採用Mmap的方式其讀/寫的效率和性能都非常高。

  • RocketMq默認的文件大小為1G,即將1G的文件映射到物理記憶體上。但mmap初始化時只是將文件磁碟地址和進程虛擬地址做了個映射,並沒有真正的將整個文件都映射到記憶體中,當程式真正訪問這片記憶體時產生缺頁異常,這時候才會將文件的內容拷貝到page cache。

如果一開始只是做個映射,而到具體寫消息時才將文件的部分頁載入到pagecache,那效率將會是多麼的低下。MappedFile初始化的操作是由單獨的執行緒(AllocateMappedFileService)實現的,就是對應的生產消費模型。RocketMq在初始化MappedFile時做了記憶體預熱,事先向page cache 中寫入一些數據flush到磁碟,使整個文件都載入到page cache中。

MappedByteBuffer技術分析

MappedByteBuffer繼承自ByteBuffer,其內部維護了一個邏輯地址變數—address。在建立映射關係時,

MappedByteBuffer利用了JDK NIO的FileChannel類提供的map()方法把文件對象映射到虛擬記憶體。

源碼中map()方法的實現,可以發現最終其通過調用native方法map0()完成文件對象的映射工作,同時使用Util.newMappedByteBuffer()方法初始化MappedByteBuffer實例,但最終返回的是DirectByteBuffer的實例。

在Java程式中使用MappedByteBuffer的get()方法來獲取記憶體數據是最終通過DirectByteBuffer.get()方法實現(底層通過unsafe.getByte()方法,以「地址 + 偏移量」的方式獲取指定映射至記憶體中的數據)。

使用Mmap的限制
  • mmap映射的記憶體空間釋放的問題

由於映射的記憶體空間本身就不屬於JVM的堆記憶體區(Java Heap),因此其不受JVM GC的控制,卸載這部分記憶體空間需要通過系統調用unmap()方法來實現。

然而unmap()方法是FileChannelImpl類里實現的私有方法,無法直接顯示調用。RocketMQ中的做法是,通過Java反射的方式調用「sun.misc」包下的Cleaner類的clean()方法來釋放映射佔用的記憶體空間;

  • MappedByteBuffer記憶體映射大小限制

因為其佔用的是虛擬記憶體(非JVM的堆記憶體),大小不受JVM的-Xmx參數限制,但其大小也受到OS虛擬記憶體大小的限制。一般來說,一次只能映射1.5~2G 的文件至用戶態的虛擬記憶體空間,RocketMQ默認設置單個CommitLog日誌數據文件為1G的原因了;

  • 使用MappedByteBuffe的其他問題

會存在記憶體佔用率較高和文件關閉不確定性的問題;

OS的PageCache機制

PageCache是OS對文件的快取,用於加速對文件的讀寫。程式對文件進行順序讀寫的速度幾乎接近於記憶體的讀寫訪問,這裡的主要原因就是在於OS使用PageCache機制對讀寫訪問操作進行了性能優化,將一部分的記憶體用作PageCache。

對於數據文件的讀取

如果一次讀取文件時出現未命中PageCache的情況,OS從物理磁碟上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取。這樣,只要下次訪問的文件已經被載入至PageCache時,讀取操作的速度基本等於訪問記憶體。

對於數據文件的寫入

OS會先寫入至Cache內,隨後通過非同步的方式由pdflush內核執行緒將Cache內的數據刷盤至物理磁碟上。

對於文件的順序讀寫操作來說,讀和寫的區域都在OS的PageCache內,此時讀寫性能接近於記憶體。

  • RocketMQ的大致做法是,將數據文件映射到OS的虛擬記憶體中(通過JDK NIO的MappedByteBuffer),寫消息的時候首先寫入PageCache,並通過非同步刷盤的方式將消息批量的做持久化(同時也支援同步刷盤);

  • 訂閱消費消息時(對CommitLog操作是隨機讀取),由於PageCache的局部性熱點原理且整體情況下還是從舊到新的有序讀,因此大部分情況下消息還是可以直接從Page Cache中讀取,不會產生太多的缺頁(Page Fault)中斷而從磁碟讀取。

PageCache機制也不是完全無缺點的,當遇到OS進行臟頁回寫,記憶體回收,記憶體swap等情況時,就會引起較大的消息讀寫延遲。

對於這些情況,RocketMQ採用了多種優化技術,比如記憶體預分配,文件預熱,mlock系統調用等,來保證在最大可能地發揮PageCache機制優點的同時,儘可能地減少其缺點帶來的消息讀寫延遲。

RocketMQ存儲優化技術

RocketMQ存儲層採用的幾項優化技術方案在一定程度上可以減少PageCache的缺點帶來的影響,主要包括記憶體預分配,文件預熱和mlock系統調用。

預先分配MappedFile

在消息寫入過程中(調用CommitLog的putMessage()方法),CommitLog會先從MappedFileQueue隊列中獲取一個 MappedFile,如果沒有就新建一個。

MappedFile的創建過程是將構建好的一個AllocateRequest請求(具體做法是,將下一個文件的路徑、下下個文件的路徑、文件大小為參數封裝為AllocateRequest對象)添加至隊列中,後台運行的AllocateMappedFileService服務執行緒(在Broker啟動時,該執行緒就會創建並運行),會不停地run,只要請求隊列里存在請求,就會去執行MappedFile映射文件的創建和預分配工作。

分配的時候有兩種策略,

一種是使用Mmap的方式來構建MappedFile實例,另外一種是從TransientStorePool堆外記憶體池中獲取相應的DirectByteBuffer來構建MappedFile。並且,在創建分配完下個MappedFile後,還會將下下個MappedFile預先創建並保存至請求隊列中等待下次獲取時直接返回。RocketMQ中預分配MappedFile的設計非常巧妙,下次獲取時候直接返回就可以不用等待MappedFile創建分配所產生的時間延遲。

文件預熱

預熱的目的主要有兩點;

  • 第一點,由於僅分配記憶體並進行mlock系統調用後並不會為程式完全鎖定這些記憶體,因為其中的分頁可能是寫時複製的。因此,就有必要對每個記憶體頁面中寫入一個假的值。其中,RocketMQ是在創建並分配MappedFile的過程中,預先寫入一些隨機值至Mmap映射出的記憶體空間里。

  • 第二,調用Mmap進行記憶體映射後,OS只是建立虛擬記憶體地址至物理地址的映射表,而實際並沒有載入任何文件至記憶體中。程式要訪問數據時OS會檢查該部分的分頁是否已經在記憶體中,如果不在,則發出一次缺頁中斷。這裡,可以想像下1G的CommitLog需要發生多少次缺頁中斷,才能使得對應的數據才能完全載入至物理記憶體中。

RocketMQ的做法是,在做Mmap記憶體映射的同時進行madvise系統調用,目的是使OS做一次記憶體映射後對應的文件數據儘可能多的預載入至記憶體中,從而達到記憶體預熱的效果。

public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        // mappedByteBuffer在java裡面對應了mmap的實現
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                // 同步刷盤機制,OS_PAGE_SIZE為4K
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }

            // prevent gc
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }

        // force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);
        // 將page cache 這片記憶體鎖定
        this.mlock();
    }

mlock 記憶體鎖定

  • OS在記憶體充足的情況下,會將文件載入到 page cache 提高文件的讀寫效率,但是當記憶體不夠用時,os會將page cache回收掉。試想如果MappedFile對應的pagecache 被os回收,那就又產生缺頁異常再次從磁碟載入到pagecache,會對系統性能產生很大的影響。

  • 將進程使用的部分或者全部的地址空間鎖定在物理記憶體中,防止其被交換到swap空間。對於RocketMQ這種的高吞吐量的分散式消息隊列來說,追求的是消息讀寫低延遲,那麼肯定希望儘可能地多使用物理記憶體,提高數據讀寫訪問的操作效率。

RocketMq在創建完MappedFile並且記憶體預熱完成後調用了c的mlock函數將這片記憶體鎖定了,具體來看下是怎麼實現的

// java 調用c
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 具體實現
public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }

        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
}

RocketMQ刷盤機制

寫消息時是先寫入到pagecache,rocketMq提供了兩種刷盤機制,同步刷盤和非同步刷盤,同步刷盤適用於對消息可靠性比較高的場合,同步刷盤性能比較低下,這樣即使系統宕機消息也不會丟失。

同步刷盤

  • RocketMQ的Broker端才會真正地返回給Producer端一個成功的ACK響應。同步刷盤對MQ消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用領域。

  • RocketMQ同步刷盤的大致做法是,基於生產者消費者模型,主執行緒創建刷盤請求實例—GroupCommitRequest並在放入刷盤寫隊列後喚醒同步刷盤執行緒—GroupCommitService,來執行刷盤動作(其中用了CAS變數和CountDownLatch來保證執行緒間的同步)。RocketMQ中用讀寫雙快取隊列(requestsWrite/requestsRead)來實現讀寫分離,其帶來的好處在於內部消費生成的同步刷盤請求可以不用加鎖,提高並發度。

  • 刷盤執行緒從阻塞隊列中獲取,刷盤其實就是調用了mappedByteBuffer.force()方法,刷盤成功後通過countdownlatch喚醒刷盤等待的執行緒,原理很簡單>


public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // 同步刷盤
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
           // 對應一個單獨的執行緒
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                 // GroupCommitRequest 封裝了CountDownLatch,GroupCommitService刷盤完畢後喚醒等待執行緒
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // 非同步刷盤
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

非同步刷盤

  • 非同步刷盤原理 發送消息執行緒寫到pagecache成功之後就返回,消息保存在page cache 中,非同步刷盤對應了一個單獨執行緒,刷盤默認一次刷4個pageSize,也就是16k的數據。非同步刷盤有可能會丟失數據,當jvm程式死掉 但機器沒有宕機,pagecache中的臟頁還是能人工刷到磁碟的,但是當機器宕機之後,數據就永遠丟失了。

  • 能夠充分利用OS的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤採用後台非同步執行緒提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量。非同步和同步刷盤的區別在於,非同步刷盤時,主執行緒並不會阻塞,在將刷盤執行緒wakeup後,就會繼續執行。

RocketMQ的堆外存儲機制

  • RocketMq提供了堆外記憶體池機制即TransientStorePool,TransientStorePool初始化時實例化5個堆外記憶體,大小和MappedFile的大小1G,然後mlock鎖定此記憶體區域。

  • 發送消息時如果開啟了堆外記憶體機制,MappedFile在實例化時從堆外記憶體池中獲取一個directBuffer實例,寫消息先寫到堆外記憶體中,然後有單獨的執行緒(CommitRealTimeService)刷到pagecache,之後再由單獨的執行緒(FlushRealTimeService)從pagecahce刷到磁碟。

開啟堆外記憶體池的好處:寫消息時先寫到堆外記憶體,純記憶體操作非常快。讀消息時是從pagecache中讀,相當於實現了讀寫分離,但是會存在延時性機制問題,以及對外記憶體宕機了會丟失,數據一致性會存在問題。

消息生產

所有發送消息的執行緒是串列執行的,所有topic的數據放一塊順序寫到pagecache中,因此效率十分的高。在寫 page cache 成功後,再由單獨的執行緒非同步構建consumerQueue和 indexFile(基於磁碟實現的hashMap,實現消息的查找),構建完成consumerQueue成功後 consumer 就能消費到最新的消息了,當然構建consumerQueue也是順序寫,每次只寫入20個位元組,佔用的空間也不大。

消息消費

每個topic可以對應多個consumerQueue,就相當於kafka裡面的分區概念,Rocketmq裡面的消費者與consumerQueue的分配演算法和kafka的相似。由於consumerQueue中只保存了消息在commitLog中的offset、msgSize、tagsCode,因此需要拿到offset去commitlog中把這條消息撈出來,這時候讀相當與隨機讀。

注意,由前面的mlock記憶體鎖定再加上消費的數據一般是最近生產的,數據還在pagecache中,對性能的影響也不大,當consumer消費很遠的數據時,pagecache中肯定是沒有快取的,這時候rocketMq建議consumer去slave上讀

總結

RocketMq所有topic共用一個commitLog,磁碟順序寫,這一點實現也是參考了kafka,讀消息時根據consumerQueue去commitLog中吧數據撈出來,雖然是隨機讀,但是最新的數據一般在pagecahce中也無關緊要。使用記憶體鎖定避免記憶體swap交換,堆外記憶體和pagecache的讀寫分離。