RocketMQ中Broker的刷盤源碼分析

  • 2019 年 10 月 3 日
  • 筆記

上一篇部落格的最後簡單提了下CommitLog的刷盤  【RocketMQ中Broker的消息存儲源碼分析】 (這篇部落格和上一篇有很大的聯繫)

Broker的CommitLog刷盤會啟動一個執行緒,不停地將緩衝區的內容寫入磁碟(CommitLog文件)中,主要分為非同步刷盤和同步刷盤

非同步刷盤又可以分為兩種方式:
①快取到mappedByteBuffer -> 寫入磁碟(包括同步刷盤)
②快取到writeBuffer -> 快取到fileChannel -> 寫入磁碟 (前面說過的開啟記憶體位元組緩衝區情況下)

 

CommitLog的兩種刷盤模式:

1 public enum FlushDiskType {  2     SYNC_FLUSH,  3     ASYNC_FLUSH  4 }

同步和非同步,同步刷盤由GroupCommitService實現,非同步刷盤由FlushRealTimeService實現,默認採用非同步刷盤

在採用非同步刷盤的模式下,若是開啟記憶體位元組緩衝區,那麼會在FlushRealTimeService的基礎上開啟CommitRealTimeService

 

同步刷盤:

啟動GroupCommitService執行緒:

 1 public void run() {   2     CommitLog.log.info(this.getServiceName() + " service started");   3   4     while (!this.isStopped()) {   5         try {   6             this.waitForRunning(10);   7             this.doCommit();   8         } catch (Exception e) {   9             CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);  10         }  11     }  12  13     // Under normal circumstances shutdown, wait for the arrival of the  14     // request, and then flush  15     try {  16         Thread.sleep(10);  17     } catch (InterruptedException e) {  18         CommitLog.log.warn("GroupCommitService Exception, ", e);  19     }  20  21     synchronized (this) {  22         this.swapRequests();  23     }  24  25     this.doCommit();  26  27     CommitLog.log.info(this.getServiceName() + " service end");  28 }

通過循環中的doCommit不斷地進行刷盤

doCommit方法:

 1 private void doCommit() {   2     synchronized (this.requestsRead) {   3         if (!this.requestsRead.isEmpty()) {   4             for (GroupCommitRequest req : this.requestsRead) {   5                 // There may be a message in the next file, so a maximum of   6                 // two times the flush   7                 boolean flushOK = false;   8                 for (int i = 0; i < 2 && !flushOK; i++) {   9                     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();  10  11                     if (!flushOK) {  12                         CommitLog.this.mappedFileQueue.flush(0);  13                     }  14                 }  15  16                 req.wakeupCustomer(flushOK);  17             }  18  19             long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();  20             if (storeTimestamp > 0) {  21                 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);  22             }  23  24             this.requestsRead.clear();  25         } else {  26             // Because of individual messages is set to not sync flush, it  27             // will come to this process  28             CommitLog.this.mappedFileQueue.flush(0);  29         }  30     }  31 }

其中在GroupCommitService中管理著兩張List:

1 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();  2 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

GroupCommitRequest中封裝了一個Offset

1 private final long nextOffset;

 

這裡就需要看到上一篇部落格結尾提到的handleDiskFlush方法:

 1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {   2     // Synchronization flush   3     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {   4         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;   5         if (messageExt.isWaitStoreMsgOK()) {   6             GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());   7             service.putRequest(request);   8             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());   9             if (!flushOK) {  10                 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()  11                     + " client address: " + messageExt.getBornHostString());  12                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);  13             }  14         } else {  15             service.wakeup();  16         }  17     }  18     // Asynchronous flush  19     else {  20         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {  21             flushCommitLogService.wakeup();  22         } else {  23             commitLogService.wakeup();  24         }  25     }  26 }

這個方法的調用發生在Broker接收到來自Producer的消息,並且完成了向ByteBuffer的寫入

可以看到,在同步刷盤SYNC_FLUSH模式下,會從AppendMessageResult 中取出WroteOffset以及WroteBytes從而計算出nextOffset,把這個nextOffset封裝到GroupCommitRequest中,然後通過GroupCommitService 的putRequest方法,將GroupCommitRequest添加到requestsWrite這個List中
putRequest方法:

1 public synchronized void putRequest(final GroupCommitRequest request) {  2     synchronized (this.requestsWrite) {  3         this.requestsWrite.add(request);  4     }  5     if (hasNotified.compareAndSet(false, true)) {  6         waitPoint.countDown(); // notify  7     }  8 }

在完成List的add操作後,會通過CAS操作修改hasNotified這個原子化的Boolean值,同時通過waitPoint的countDown進行喚醒操作,在後面會有用

由於這裡這裡是同步刷盤,所以需要通過GroupCommitRequest的waitForFlush方法,在超時時間內等待該記錄對應的刷盤完成
而非同步刷盤會通過wakeup方法喚醒刷盤任務,並沒有進行等待,這就是二者區別

回到doCommit方法中,這時會發現這裡是對requestsRead這條List進行的操作,而剛才是將記錄存放在requestsWrite這條List中的
這就和在run方法中的waitForRunning方法有關了:

 1 protected void waitForRunning(long interval) {   2    if (hasNotified.compareAndSet(true, false)) {   3         this.onWaitEnd();   4         return;   5     }   6   7     //entry to wait   8     waitPoint.reset();   9  10     try {  11         waitPoint.await(interval, TimeUnit.MILLISECONDS);  12     } catch (InterruptedException e) {  13         log.error("Interrupted", e);  14     } finally {  15         hasNotified.set(false);  16         this.onWaitEnd();  17     }  18 }

這裡通過CAS操作修改hasNotified值,從而調用onWaitEnd方法;如果修改失敗,則因為await進入阻塞,等待上面所說的putRequest方法將其喚醒,也就是說當Producer發送的消息被快取成功後,調用handleDiskFlush方法後,喚醒刷盤線工作,當然刷盤執行緒在達到超時時間interval後也會喚醒

再來看看onWaitEnd方法:

1 protected void onWaitEnd() {  2     this.swapRequests();  3 }  4  5 private void swapRequests() {  6     List<GroupCommitRequest> tmp = this.requestsWrite;  7     this.requestsWrite = this.requestsRead;  8     this.requestsRead = tmp;  9 }

可以看到,這裡是將兩個List進行了交換

這是一個非常有趣的做法,如果熟悉JVM的話,有沒有覺得這其實很像新生代的複製演算法!
當刷盤執行緒阻塞的時候,requestsWrite中會填充記錄,當刷盤執行緒被喚醒工作的時候,首先會將requestsWrite和requestsRead進行交換,那麼此時的記錄就是從requestsRead中讀取的了,而同時requestsWrite會變為空的List,消息記錄就會往這個空的List中填充,如此往複

可以看到doCommit方法中,當requestsRead不為空的時候,在最後會調用requestsRead的clear方法,由此證明了我上面的說法

 

仔細來看看是如何進行刷盤的:

 1 for (GroupCommitRequest req : this.requestsRead) {   2    // There may be a message in the next file, so a maximum of   3     // two times the flush   4     boolean flushOK = false;   5     for (int i = 0; i < 2 && !flushOK; i++) {   6         flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();   7   8         if (!flushOK) {   9             CommitLog.this.mappedFileQueue.flush(0);  10         }  11     }  12  13     req.wakeupCustomer(flushOK);  14 }

通過遍歷requestsRead,可以到得到GroupCommitRequest封裝的NextOffset

其中flushedWhere是用來記錄上一次刷盤完成後的offset,若是上一次的刷盤位置大於等於NextOffset,就說明從NextOffset位置起始已經被刷新過了,不需要刷新,否則調用mappedFileQueue的flush方法進行刷盤

MappedFileQueue的flush方法:

 1 public boolean flush(final int flushLeastPages) {   2     boolean result = true;   3     MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);   4     if (mappedFile != null) {   5         long tmpTimeStamp = mappedFile.getStoreTimestamp();   6         int offset = mappedFile.flush(flushLeastPages);   7         long where = mappedFile.getFileFromOffset() + offset;   8         result = where == this.flushedWhere;   9         this.flushedWhere = where;  10         if (0 == flushLeastPages) {  11             this.storeTimestamp = tmpTimeStamp;  12         }  13     }  14  15     return result;  16 }

這裡首先根據flushedWhere上一次刷盤完成後的offset,通過findMappedFileByOffset方法,找到CommitLog文件的映射MappedFile
有關MappedFile及其相關操作在我之前的部落格中介紹過很多次,就不再累贅

再找到MappedFile後,調用其flush方法:

MappedFile的flush方法:

 1 public int flush(final int flushLeastPages) {   2     if (this.isAbleToFlush(flushLeastPages)) {   3         if (this.hold()) {   4             int value = getReadPosition();   5   6             try {   7                 //We only append data to fileChannel or mappedByteBuffer, never both.   8                 if (writeBuffer != null || this.fileChannel.position() != 0) {   9                     this.fileChannel.force(false);  10                 } else {  11                     this.mappedByteBuffer.force();  12                 }  13             } catch (Throwable e) {  14                 log.error("Error occurred when force data to disk.", e);  15             }  16  17             this.flushedPosition.set(value);  18             this.release();  19         } else {  20             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());  21             this.flushedPosition.set(getReadPosition());  22         }  23     }  24     return this.getFlushedPosition();  25 }

首先isAbleToFlush方法:

 1 private boolean isAbleToFlush(final int flushLeastPages) {   2     int flush = this.flushedPosition.get();   3     int write = getReadPosition();   4   5     if (this.isFull()) {   6         return true;   7     }   8   9     if (flushLeastPages > 0) {  10         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;  11     }  12  13     return write > flush;  14 }

其中flush記錄的是上一次完成刷新後的位置,write記錄的是當前消息內容寫入後的位置
當flushLeastPages 大於0的時候,通過:

1 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;

可以計算出是否滿足page的要求,其中OS_PAGE_SIZE是4K,也就是說1個page大小是4k

由於這裡是同步刷盤,flushLeastPages是0,不對page要求,只要有快取有內容就會刷盤;但是在非同步刷盤中,flushLeastPages是4,也就是說,只有當快取的消息至少是4(page個數)*4K(page大小)= 16K時,非同步刷盤才會將快取寫入文件

 

回到MappedFile的flush方法,在通過isAbleToFlush檢查完寫入要求後

 1 int value = getReadPosition();   2 try {   3     //We only append data to fileChannel or mappedByteBuffer, never both.   4     if (writeBuffer != null || this.fileChannel.position() != 0) {   5         this.fileChannel.force(false);   6     } else {   7         this.mappedByteBuffer.force();   8     }   9 } catch (Throwable e) {  10     log.error("Error occurred when force data to disk.", e);  11 }  12  13 this.flushedPosition.set(value);

首先通過getReadPosition獲取當前消息內容寫入後的位置,由於是同步刷盤,所以這裡調用mappedByteBuffer的force方法,通過JDK的NIO操作,將mappedByteBuffer快取中的數據寫入CommitLog文件中
最後更新flushedPosition的值

再回到MappedFileQueue的flush方法,在完成MappedFile的flush後,還需要更新flushedWhere的值

此時快取中的數據完成了持久化,同步刷盤結束

 

非同步刷盤:

①FlushCommitLogService:

 1 public void run() {   2     CommitLog.log.info(this.getServiceName() + " service started");   3   4     while (!this.isStopped()) {   5         boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();   6   7         int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();   8         int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();   9  10         int flushPhysicQueueThoroughInterval =  11             CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();  12  13         boolean printFlushProgress = false;  14  15         // Print flush progress  16         long currentTimeMillis = System.currentTimeMillis();  17         if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {  18             this.lastFlushTimestamp = currentTimeMillis;  19             flushPhysicQueueLeastPages = 0;  20             printFlushProgress = (printTimes++ % 10) == 0;  21         }  22  23         try {  24             if (flushCommitLogTimed) {  25                 Thread.sleep(interval);  26             } else {  27                 this.waitForRunning(interval);  28             }  29  30             if (printFlushProgress) {  31                 this.printFlushProgress();  32             }  33  34             long begin = System.currentTimeMillis();  35             CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);  36             long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();  37             if (storeTimestamp > 0) {  38                 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);  39             }  40             long past = System.currentTimeMillis() - begin;  41             if (past > 500) {  42                 log.info("Flush data to disk costs {} ms", past);  43             }  44         } catch (Throwable e) {  45             CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);  46             this.printFlushProgress();  47         }  48     }  49  50     // Normal shutdown, to ensure that all the flush before exit  51     boolean result = false;  52     for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {  53         result = CommitLog.this.mappedFileQueue.flush(0);  54         CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));  55     }  56  57     this.printFlushProgress();  58  59     CommitLog.log.info(this.getServiceName() + " service end");  60 }

flushCommitLogTimed:是否使用定時刷盤
interval:刷盤時間間隔,默認500ms
flushPhysicQueueLeastPages:page大小,默認4個
flushPhysicQueueThoroughInterval:徹底刷盤時間間隔,默認10s

首先根據lastFlushTimestamp(上一次刷盤時間)+ flushPhysicQueueThoroughInterval和當前時間比較,判斷是否需要進行一次徹底刷盤,若達到了需要則將flushPhysicQueueLeastPages置為0

接著根據flushCommitLogTimed判斷
當flushCommitLogTimed為true,使用sleep等待500ms
當flushCommitLogTimed為false,調用waitForRunning在超時時間為500ms下阻塞,其喚醒條件也就是在handleDiskFlush中的wakeup喚醒

最後,和同步刷盤一樣,調用mappedFileQueue的flush方法
只不過,這裡的flushPhysicQueueLeastPages決定了其是進行徹底刷新,還是按4page(16K)的標準刷新

②CommitRealTimeService
這種刷盤方式需要和FlushCommitLogService配合

CommitRealTimeService的run方法:

 1 public void run() {   2    CommitLog.log.info(this.getServiceName() + " service started");   3     while (!this.isStopped()) {   4         int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();   5   6         int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();   7   8         int commitDataThoroughInterval =   9             CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();  10  11         long begin = System.currentTimeMillis();  12         if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {  13             this.lastCommitTimestamp = begin;  14             commitDataLeastPages = 0;  15         }  16  17         try {  18             boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);  19             long end = System.currentTimeMillis();  20             if (!result) {  21                 this.lastCommitTimestamp = end; // result = false means some data committed.  22                 //now wake up flush thread.  23                 flushCommitLogService.wakeup();  24             }  25  26             if (end - begin > 500) {  27                 log.info("Commit data to file costs {} ms", end - begin);  28             }  29             this.waitForRunning(interval);  30         } catch (Throwable e) {  31             CommitLog.log.error(this.getServiceName() + " service has exception. ", e);  32         }  33     }  34  35     boolean result = false;  36     for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {  37         result = CommitLog.this.mappedFileQueue.commit(0);  38         CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));  39     }  40     CommitLog.log.info(this.getServiceName() + " service end");  41 }

這裡的邏輯和FlushCommitLogService中相似,之不過參數略有不同

interval:提交時間間隔,默認200ms
commitDataLeastPages:page大小,默認4個
commitDataThoroughInterval:提交完成時間間隔,默認200ms

基本和FlushCommitLogService相似,只不過調用了mappedFileQueue的commit方法

 1 public boolean commit(final int commitLeastPages) {   2     boolean result = true;   3     MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);   4     if (mappedFile != null) {   5         int offset = mappedFile.commit(commitLeastPages);   6         long where = mappedFile.getFileFromOffset() + offset;   7         result = where == this.committedWhere;   8         this.committedWhere = where;   9     }  10  11     return result;  12 }

這裡和mappedFileQueue的flush方法很相似,通過committedWhere尋找MappedFile

然後調用MappedFile的commit方法:

 1 public int commit(final int commitLeastPages) {   2     if (writeBuffer == null) {   3         //no need to commit data to file channel, so just regard wrotePosition as committedPosition.   4         return this.wrotePosition.get();   5     }   6     if (this.isAbleToCommit(commitLeastPages)) {   7         if (this.hold()) {   8             commit0(commitLeastPages);   9             this.release();  10         } else {  11             log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());  12         }  13     }  14  15     // All dirty data has been committed to FileChannel.  16     if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {  17         this.transientStorePool.returnBuffer(writeBuffer);  18         this.writeBuffer = null;  19     }  20  21     return this.committedPosition.get();  22 }

依舊和MappedFile的flush方法很相似,在isAbleToCommit檢查完page後調用commit0方法

MappedFile的commit0方法:

 1 protected void commit0(final int commitLeastPages) {   2     int writePos = this.wrotePosition.get();   3     int lastCommittedPosition = this.committedPosition.get();   4   5     if (writePos - this.committedPosition.get() > 0) {   6         try {   7             ByteBuffer byteBuffer = writeBuffer.slice();   8             byteBuffer.position(lastCommittedPosition);   9             byteBuffer.limit(writePos);  10             this.fileChannel.position(lastCommittedPosition);  11             this.fileChannel.write(byteBuffer);  12             this.committedPosition.set(writePos);  13         } catch (Throwable e) {  14             log.error("Error occurred when commit data to FileChannel.", e);  15         }  16     }  17 }

 【RocketMQ中Broker的消息存儲源碼分析】 

中說過,當使用這種方式時,會先將消息快取在writeBuffer中而不是之前的mappedByteBuffer
這裡就可以清楚地看到將writeBuffer中從lastCommittedPosition(上次提交位置)開始到writePos(快取消息結束位置)的內容快取到了fileChannel中相同的位置,並沒有寫入磁碟
在快取到fileChannel後,會更新committedPosition值

回到commit方法,在向fileCfihannel快取完畢後,會檢查committedPosition是否達到了fileSize,也就是判斷writeBuffer中的內容是不是去全部提交完畢

若是全部提交,需要通過transientStorePool的returnBuffer方法來回收利用writeBuffer
transientStorePool其實是一個雙向隊列,由CommitLog來管理
TransientStorePool:

 1 public class TransientStorePool {   2     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);   3   4     private final int poolSize;   5     private final int fileSize;   6     private final Deque<ByteBuffer> availableBuffers;   7     private final MessageStoreConfig storeConfig;   8   9     public TransientStorePool(final MessageStoreConfig storeConfig) {  10         this.storeConfig = storeConfig;  11         this.poolSize = storeConfig.getTransientStorePoolSize();  12         this.fileSize = storeConfig.getMapedFileSizeCommitLog();  13         this.availableBuffers = new ConcurrentLinkedDeque<>();  14     }  15     ......  16 }

returnBuffer方法:

1 public void returnBuffer(ByteBuffer byteBuffer) {  2     byteBuffer.position(0);  3     byteBuffer.limit(fileSize);  4     this.availableBuffers.offerFirst(byteBuffer);  5 }

這裡就可以清楚地看到byteBuffer確實被回收了

 

回到MappedFileQueue的commit方法:

 1 public boolean commit(final int commitLeastPages) {   2     boolean result = true;   3     MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);   4     if (mappedFile != null) {   5         int offset = mappedFile.commit(commitLeastPages);   6         long where = mappedFile.getFileFromOffset() + offset;   7         result = where == this.committedWhere;   8         this.committedWhere = where;   9     }  10  11     return result;  12 }

在完成mappedFile的commit後,通過where和committedWhere來判斷是否真的向fileCfihannel快取了 ,只有確實快取了result才是false!
之後會更新committedWhere,並返回result

 

那麼回到CommitRealTimeService的run方法,在完成commit之後,會判斷result
只有真的向fileCfihannel快取後,才會調用flushCommitLogService的wakeup方法,也就是喚醒了FlushCommitLogService的刷盤執行緒

唯一和之前分析的FlushCommitLogService不同的地方是在MappedFile的flush方法中:

1 if (writeBuffer != null || this.fileChannel.position() != 0) {  2     this.fileChannel.force(false);  3 } else {  4     this.mappedByteBuffer.force();  5 }

之前在沒有開啟記憶體位元組緩衝區的情況下,是將mappedByteBuffer中的內容寫入磁碟
而這時,終於輪到fileChannel了

可以看到這裡的條件判斷,當writeBuffer不等與null,或者fileChannel的position不等與0
writeBuffer等於null的情況會在TransientStorePool對其回收之後

到這裡就可以明白開啟記憶體位元組緩衝區的情況下,其實是進行了兩次快取才寫入磁碟

 

至此,Broker的消息持久化以及刷盤的整個過程完畢