源碼分析 RocketMQ DLedger(多副本) 之日誌追加流程

  • 2019 年 10 月 4 日
  • 筆記

上一篇我們詳細分析了 源碼分析RocketMQ多副本之Leader選主,本文將詳細分析日誌複製的實現。

有了前篇 源碼分析 RocketMQ DLedger 多副本存儲實現,本文將直接從 Leader 處理客戶端請求入口開始,其入口為:DLedgerServer 的 handleAppend 方法開始講起。

1、日誌複製基本流程


在正式分析 RocketMQ DLedger 多副本複製之前,我們首先來了解客戶端發送日誌的請求協議欄位,其類圖如下所示:

我們先一一介紹各個欄位的含義:

  • String group 該集群所屬組名。
  • String remoteId 請求目的節點ID。
  • String localId 節點ID。
  • int code 請求響應欄位,表示返迴響應碼。
  • String leaderId = null 集群中的Leader Id。
  • long term 集群當前的選舉輪次。
  • byte[] body 待發送的數據。

日誌的請求處理處理入口為 DLedgerServer 的 handleAppend 方法。

DLedgerServer#handleAppend

PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());  reConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());  PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);  

Step1:首先驗證請求的合理性:

  • 如果請求的節點ID不是當前處理節點,則拋出異常。
  • 如果請求的集群不是當前節點所在的集群,則拋出異常。
  • 如果當前節點不是主節點,則拋出異常。

DLedgerServer#handleAppend

long currTerm = memberState.currTerm();  if (dLedgerEntryPusher.isPendingFull(currTerm)) {  // @1      AppendEntryResponse appendEntryResponse = new AppendEntryResponse();      appendEntryResponse.setGroup(memberState.getGroup());      appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());      appendEntryResponse.setTerm(currTerm);      appendEntryResponse.setLeaderId(memberState.getSelfId());      return AppendFuture.newCompletedFuture(-, appendEntryResponse);  } else {   // @2      DLedgerEntry dLedgerEntry = new DLedgerEntry();      dLedgerEntry.setBody(request.getBody());      DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);      return dLedgerEntryPusher.waitAck(resEntry);  }  

Step2:如果預處理隊列已經滿了,則拒絕客戶端請求,返回 LEADER_PENDING_FULL 錯誤碼;如果未滿,將請求封裝成 DledgerEntry,則調用 dLedgerStore 方法追加日誌,並且通過使用 dLedgerEntryPusher 的 waitAck 方法同步等待副本節點的複製響應,並最終將結果返回給調用方法。

  • 程式碼@1:如果 dLedgerEntryPusher 的 push 隊列已滿,則返回追加一次,其錯誤碼為 LEADER_PENDING_FULL。
  • 程式碼@2:追加消息到 Leader 伺服器,並向從節點廣播,在指定時間內如果未收到從節點的確認,則認為追加失敗。

接下來就按照上述三個要點進行展開:

  • 判斷 Push 隊列是否已滿
  • Leader 節點存儲消息
  • 主節點等待從節點複製 ACK

1.1 如何判斷 Push 隊列是否已滿

DLedgerEntryPusher#isPendingFull

public boolean isPendingFull(long currTerm) {      checkTermForPendingMap(currTerm, "isPendingFull");     // @1      return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); // @2  }  

主要分兩個步驟: 程式碼@1:檢查當前投票輪次是否在 PendingMap 中,如果不在,則初始化,其結構為:Map< Long/* 投票輪次*/, ConcurrentMap>>。

程式碼@2:檢測當前等待從節點返回結果的個數是否超過其最大請求數量,可通過maxPendingRequests Num 配置,該值默認為:10000。

上述邏輯比較簡單,但疑問隨著而來,ConcurrentMap> 中的數據是從何而來的呢?我們不妨接著往下看。

1.2 Leader 節點存儲數據

Leader 節點的數據存儲主要由 DLedgerStore 的 appendAsLeader 方法實現。DLedger 分別實現了基於記憶體、基於文件的存儲實現,本文重點關注基於文件的存儲實現,其實現類為:DLedgerMmapFileStore。

下面重點來分析一下數據存儲流程,其入口為DLedgerMmapFileStore 的 appendAsLeader 方法。

DLedgerMmapFileStore#appendAsLeader

PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);  PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);  

Step1:首先判斷是否可以追加數據,其判斷依據主要是如下兩點:

  • 當前節點的狀態是否是 Leader,如果不是,則拋出異常。
  • 當前磁碟是否已滿,其判斷依據是 DLedger 的根目錄或數據文件目錄的使用率超過了允許使用的最大值,默認值為85%。
ByteBuffer dataBuffer = localEntryBuffer.get();  ByteBuffer indexBuffer = localIndexBuffer.get();  

Step2:從本地執行緒變數獲取一個數據與索引 buffer。其中用於存儲數據的 ByteBuffer,其容量固定為 4M ,索引的 ByteBuffer 為兩個索引條目的長度,固定為64個位元組。

DLedgerEntryCoder.encode(entry, dataBuffer);  public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {      byteBuffer.clear();      int size = entry.computSizeInBytes();      //always put magic on the first position      byteBuffer.putInt(entry.getMagic());      byteBuffer.putInt(size);      byteBuffer.putLong(entry.getIndex());      byteBuffer.putLong(entry.getTerm());      byteBuffer.putLong(entry.getPos());      byteBuffer.putInt(entry.getChannel());      byteBuffer.putInt(entry.getChainCrc());      byteBuffer.putInt(entry.getBodyCrc());      byteBuffer.putInt(entry.getBody().length);      byteBuffer.put(entry.getBody());      byteBuffer.flip();  }  

Step3:將 DLedgerEntry,即將數據寫入到 ByteBuffer中,從這裡看出,每一次寫入會調用 ByteBuffer 的 clear 方法,將數據清空,從這裡可以看出,每一次數據追加,只能存儲4M的數據。

DLedgerMmapFileStore#appendAsLeader

synchronized (memberState) {      PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);      // ... 省略程式碼  }  

Step4:鎖定狀態機,並再一次檢測節點的狀態是否是 Leader 節點。

DLedgerMmapFileStore#appendAsLeader

long nextIndex = ledgerEndIndex + ;  entry.setIndex(nextIndex);  entry.setTerm(memberState.currTerm());  entry.setMagic(CURRENT_MAGIC);  DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);  

Step5:為當前日誌條目設置序號,即 entryIndex 與 entryTerm (投票輪次)。並將魔數、entryIndex、entryTerm 等寫入到 bytebuffer 中。

DLedgerMmapFileStore#appendAsLeader

long prePos = dataFileList.preAppend(dataBuffer.remaining());  entry.setPos(prePos);  PreConditions.check(prePos != -, DLedgerResponseCode.DISK_ERROR, null);  DLedgerEntryCoder.setPos(dataBuffer, prePos);  

Step6:計算新的消息的起始偏移量,關於 dataFileList 的 preAppend 後續詳細介紹其實現,然後將該偏移量寫入日誌的 bytebuffer 中。

DLedgerMmapFileStore#appendAsLeader

for (AppendHook writeHook : appendHooks) {      writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);  }  

Step7:執行鉤子函數。

DLedgerMmapFileStore#appendAsLeader

long dataPos = dataFileList.append(dataBuffer.array(), , dataBuffer.remaining());  PreConditions.check(dataPos != -, DLedgerResponseCode.DISK_ERROR, null);  PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);  

Step8:將數據追加到 pagecache 中。該方法稍後詳細介紹。

DLedgerMmapFileStore#appendAsLeader

DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);  long indexPos = indexFileList.append(indexBuffer.array(), , indexBuffer.remaining(), false);  PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);  

Step9:構建條目索引並將索引數據追加到 pagecache。

DLedgerMmapFileStore#appendAsLeader

ledgerEndIndex++;  ledgerEndTerm = memberState.currTerm();  if (ledgerBeginIndex == -) {      ledgerBeginIndex = ledgerEndIndex;  }  updateLedgerEndIndexAndTerm();  

Step10:ledgerEndeIndex 加一(下一個條目)的序號。並設置 leader 節點的狀態機的 ledgerEndIndex 與 ledgerEndTerm。

Leader 節點數據追加就介紹到這裡,稍後會重點介紹與存儲相關方法的實現細節。

1.3 主節點等待從節點複製 ACK

其實現入口為 dLedgerEntryPusher 的 waitAck 方法。

DLedgerEntryPusher#waitAck

public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {      updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());    // @1      if (memberState.getPeerMap().size() == ) {                                                                  // @2          AppendEntryResponse response = new AppendEntryResponse();          response.setGroup(memberState.getGroup());          response.setLeaderId(memberState.getSelfId());          response.setIndex(entry.getIndex());          response.setTerm(entry.getTerm());          response.setPos(entry.getPos());          return AppendFuture.newCompletedFuture(entry.getPos(), response);      } else {          checkTermForPendingMap(entry.getTerm(), "waitAck");          AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // @3          future.setPos(entry.getPos());          CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);     // @4          if (old != null) {              logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());          }          wakeUpDispatchers();                                       // @5          return future;      }  }  

程式碼@1:更新當前節點的 push 水位線。

程式碼@2:如果集群的節點個數為1,無需轉發,直接返回成功結果。

程式碼@3:構建 append 響應 Future 並設置超時時間,默認值為:2500 ms,可以通過 maxWaitAckTimeMs 配置改變其默認值。

程式碼@4:將構建的 Future 放入等待結果集合中。

程式碼@5:喚醒 Entry 轉發執行緒,即將主節點中的數據 push 到各個從節點。

接下來分別對上述幾個關鍵點進行解讀。

1.3.1 updatePeerWaterMark 方法

DLedgerEntryPusher#updatePeerWaterMark

private void updatePeerWaterMark(long term, String peerId, long index) {    // 程式碼@1      synchronized (peerWaterMarksByTerm) {         checkTermForWaterMark(term, "updatePeerWaterMark");                     // 程式碼@2          if (peerWaterMarksByTerm.get(term).get(peerId) < index) {                   // 程式碼@3              peerWaterMarksByTerm.get(term).put(peerId, index);          }      }  }  

程式碼@1:先來簡單介紹該方法的兩個參數:

  • long term 當前的投票輪次。
  • String peerId 當前節點的ID。
  • long index 當前追加數據的序號。

程式碼@2:初始化 peerWaterMarksByTerm 數據結構,其結果為 < Long /** term */, Map< String /** peerId */, Long /** entry index*/>。

程式碼@3:如果 peerWaterMarksByTerm 存儲的 index 小於當前數據的 index,則更新。

1.3.2 wakeUpDispatchers 詳解

DLedgerEntryPusher#updatePeerWaterMark

public void wakeUpDispatchers() {      for (EntryDispatcher dispatcher : dispatcherMap.values()) {          dispatcher.wakeup();      }  }  

該方法主要就是遍歷轉發器並喚醒。本方法的核心關鍵就是 EntryDispatcher,在詳細介紹它之前我們先來看一下該集合的初始化。

DLedgerEntryPusher 構造方法

for (String peer : memberState.getPeerMap().keySet()) {      if (!peer.equals(memberState.getSelfId())) {          dispatcherMap.put(peer, new EntryDispatcher(peer, logger));      }  }  

原來在構建 DLedgerEntryPusher 時會為每一個從節點創建一個 EntryDispatcher 對象。

顯然,日誌的複製由 DLedgerEntryPusher 來實現。由於篇幅的原因,該部分內容將在下篇文章中繼續。

上面在講解 Leader 追加日誌時並沒有詳細分析存儲相關的實現,為了知識體系的完備,接下來我們來分析一下其核心實現。

2、日誌存儲實現詳情


主要對 MmapFileList 的 preAppend 與 append 方法進行詳細講解。

存儲部分的設計請查閱筆者的部落格:源碼分析RocketMQ DLedger 多副本存儲實現,MmapFileList 對標 RocketMQ 的MappedFileQueue。

2.1 MmapFileList 的 preAppend 詳解

該方法最終會調用兩個參數的preAppend方法,故我們直接來看兩個參數的 preAppend 方法。

MmapFileList#preAppend

public long preAppend(int len, boolean useBlank) {                // @1      MmapFile mappedFile = getLastMappedFile();                   // @2 start      if (null == mappedFile || mappedFile.isFull()) {          mappedFile = getLastMappedFile();      }      if (null == mappedFile) {          logger.error("Create mapped file for {}", storePath);          return -;      }                                                                                            // @2 end      int blank = useBlank ? MIN_BLANK_LEN : ;      if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {   // @3          if (blank < MIN_BLANK_LEN) {              logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);              return -;          } else {              ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition());     // @4              byteBuffer.putInt(BLANK_MAGIC_CODE);                                                                                                      // @5              byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());                                               // @6              if (mappedFile.appendMessage(byteBuffer.array())) {                                                                                     // @7                  //need to set the wrote position                  mappedFile.setWrotePosition(mappedFile.getFileSize());              } else {                  logger.error("Append blank error for {}", storePath);                  return -;              }              mappedFile = getLastMappedFile();              if (null == mappedFile) {                  logger.error("Create mapped file for {}", storePath);                  return -;              }          }      }      return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();// @8  }  

程式碼@1:首先介紹其參數的含義:

  • int len 需要申請的長度。
  • boolean useBlank 是否需要填充,默認為true。

程式碼@2:獲取最後一個文件,即獲取當前正在寫的文件。

程式碼@3:如果需要申請的資源超過了當前文件可寫位元組時,需要處理的邏輯。程式碼@4-@7都是其處理邏輯。

程式碼@4:申請一個當前文件剩餘位元組的大小的bytebuffer。

程式碼@5:先寫入魔數。

程式碼@6:寫入位元組長度,等於當前文件剩餘的總大小。

程式碼@7:寫入空位元組,程式碼@4-@7的用意就是寫一條空Entry,填入魔數與 size,方便解析。

程式碼@8:如果當前文件足以容納待寫入的日誌,則直接返回其物理偏移量。

經過上述程式碼解讀,我們很容易得出該方法的作用,就是返回待寫入日誌的起始物理偏移量。

2.2 MmapFileList 的 append 詳解

最終會調用4個參數的 append 方法,其程式碼如下: MmapFileList#append

public long append(byte[] data, int pos, int len, boolean useBlank) {  // @1      if (preAppend(len, useBlank) == -) {          return -;      }      MmapFile mappedFile = getLastMappedFile();                               // @2      long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();   // @3      if (!mappedFile.appendMessage(data, pos, len)) {            // @4          logger.error("Append error for {}", storePath);          return -;      }      return currPosition;  }  

程式碼@1:首先介紹一下各個參數:

  • byte[] data 待寫入的數據,即待追加的日誌。
  • int pos 從 data 位元組數組哪個位置開始讀取。
  • int len 待寫入的位元組數量。
  • boolean useBlank 是否使用填充,默認為 true。

程式碼@2:獲取最後一個文件,即當前可寫的文件。

程式碼@3:獲取當前寫入指針。

程式碼@4:追加消息。

最後我們再來看一下 appendMessage,具體的消息追加實現邏輯。

DefaultMmapFile#appendMessage

public boolean appendMessage(final byte[] data, final int offset, final int length) {      int currentPos = this.wrotePosition.get();        if ((currentPos + length) <= this.fileSize) {          ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // @1          byteBuffer.position(currentPos);          byteBuffer.put(data, offset, length);          this.wrotePosition.addAndGet(length);          return true;      }      return false;  }  

該方法主要突出一下寫入的方式是 mappedByteBuffer,是通過 FileChannel 的 map方法創建,即我們常說的 PageCache,即消息追加首先是寫入到 pageCache 中。

本文詳細介紹了 Leader 節點處理客戶端消息追加請求的前面兩個步驟,即 判斷 Push 隊列是否已滿 與 Leader 節點存儲消息。考慮到篇幅的問題,各個節點的數據同步將在下一篇文章中詳細介紹。

在進入下一篇的文章學習之前,我們不妨思考一下如下問題:

  1. 如果主節點追加成功(寫入到 PageCache),但同步到從節點過程失敗或此時主節點宕機,集群中的數據如何保證一致性?

推薦閱讀源碼分析 RocketMQ DLedger 多副本系列文章:

1、RocketMQ 多副本前置篇:初探raft協議

2、源碼分析RocketMQ多副本之Leader選主

3、源碼分析 RocketMQ DLedger 多副本存儲實現