源碼分析 RocketMQ DLedger(多副本) 之日誌複製-下篇

  • 2019 年 10 月 7 日
  • 筆記

溫馨提示:由於微信單篇文章的字數限制,RocketMQ DLedger 日誌複製分為兩篇文章介紹。本篇緊接著上文源碼分析 RocketMQ DLedger(多副本) 之日誌複製-上篇

3、EntryHandler 詳解


EntryHandler 同樣是一個執行緒,當節點狀態為從節點時激活。

3.1 核心類圖

其核心屬性如下:

  • long lastCheckFastForwardTimeMs 上一次檢查主伺服器是否有 push 消息的時間戳。
  • ConcurrentMap>> writeRequestMap append 請求處理隊列。
  • BlockingQueue>> compareOrTruncateRequests COMMIT、COMPARE、TRUNCATE 相關請求

3.2 handlePush

從上文得知,主節點會主動向從節點傳播日誌,從節點會通過網路接受到請求數據進行處理,其調用鏈如圖所示:

最終會調用 EntryHandler 的 handlePush 方法。

EntryHandler#handlePush

public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {      //The timeout should smaller than the remoting layer's request timeout      CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>();      // @1      switch (request.getType()) {          case APPEND:                                                                                                          // @2              PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);              long index = request.getEntry().getIndex();              Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));              if (old != null) {                  logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());                  future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));              }              break;          case COMMIT:                                                                                                           // @3              compareOrTruncateRequests.put(new Pair<>(request, future));              break;          case COMPARE:          case TRUNCATE:                                                                                                     // @4              PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);              writeRequestMap.clear();              compareOrTruncateRequests.put(new Pair<>(request, future));              break;          default:              logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());              future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));              break;      }      return future;  }  

從幾點處理主節點的 push 請求,其實現關鍵點如下。

程式碼@1:首先構建一個響應結果Future,默認超時時間 1s。

程式碼@2:如果是 APPEND 請求,放入到 writeRequestMap 集合中,如果已存在該數據結構,說明主節點重複推送,構建返回結果,其狀態碼為 REPEATED_PUSH。放入到 writeRequestMap 中,由 doWork 方法定時去處理待寫入的請求。

程式碼@3:如果是提交請求, 將請求存入 compareOrTruncateRequests 請求處理中,由 doWork 方法非同步處理。

程式碼@4:如果是 COMPARE 或 TRUNCATE 請求,將待寫入隊列 writeRequestMap 清空,並將請求放入 compareOrTruncateRequests 請求隊列中,由 doWork 方法非同步處理。

接下來,我們重點來分析 doWork 方法的實現。

3.3 doWork 方法詳解

EntryHandler#doWork

public void doWork() {      try {          if (!memberState.isFollower()) {     // @1              waitForRunning();              return;          }          if (compareOrTruncateRequests.peek() != null) {    // @2              Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();              PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);              switch (pair.getKey().getType()) {                  case TRUNCATE:                      handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());                      break;                  case COMPARE:                      handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());                      break;                  case COMMIT:                      handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());                      break;                  default:                      break;              }          } else { // @3              long nextIndex = dLedgerStore.getLedgerEndIndex() + ;              Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);              if (pair == null) {                  checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());                  waitForRunning();                  return;              }              PushEntryRequest request = pair.getKey();              handleDoAppend(nextIndex, request, pair.getValue());          }      } catch (Throwable t) {          DLedgerEntryPusher.logger.error("Error in {}", getName(), t);          DLedgerUtils.sleep();      }  }  

程式碼@1:如果當前節點的狀態不是從節點,則跳出。

程式碼@2:如果 compareOrTruncateRequests 隊列不為空,說明有COMMIT、COMPARE、TRUNCATE 等請求,這類請求優先處理。值得注意的是這裡使用是 peek、poll 等非阻塞方法,然後根據請求的類型,調用對應的方法。稍後詳細介紹。

程式碼@3:如果只有 append 類請求,則根據當前節點最大的消息序號,嘗試從 writeRequestMap 容器中,獲取下一個消息複製請求(ledgerEndIndex + 1) 為 key 去查找。如果不為空,則執行 doAppend 請求,如果為空,則調用 checkAbnormalFuture 來處理異常情況。

接下來我們來重點分析各個處理細節。

3.3.1 handleDoCommit

處理提交請求,其處理比較簡單,就是調用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量,故我們還是具體看一下DLedgerStore 的 updateCommittedIndex 方法。

DLedgerMmapFileStore#updateCommittedIndex

public void updateCommittedIndex(long term, long newCommittedIndex) {   // @1      if (newCommittedIndex == -              || ledgerEndIndex == -              || term < memberState.currTerm()              || newCommittedIndex == this.committedIndex) {                               // @2              return;      }      if (newCommittedIndex < this.committedIndex              || newCommittedIndex < this.ledgerBeginIndex) {                             // @3          logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex);          return;      }      long endIndex = ledgerEndIndex;      if (newCommittedIndex > endIndex) {                                                       // @4              //If the node fall behind too much, the committedIndex will be larger than enIndex.          newCommittedIndex = endIndex;      }      DLedgerEntry dLedgerEntry = get(newCommittedIndex);                        // @5      PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR);      this.committedIndex = newCommittedIndex;      this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize();     // @6  }  

程式碼@1:首先介紹一下方法的參數:

  • long term 主節點當前的投票輪次。
  • long newCommittedIndex: 主節點發送日誌複製請求時的已提交日誌序號。

程式碼@2:如果待更新提交序號為 -1 或 投票輪次小於從節點的投票輪次或主節點投票輪次等於從節點的已提交序號,則直接忽略本次提交動作。

程式碼@3:如果主節點的已提交日誌序號小於從節點的已提交日誌序號或待提交序號小於當前節點的最小有效日誌序號,則輸出警告日誌[MONITOR],並忽略本次提交動作。

程式碼@4:如果從節點落後主節點太多,則重置 提交索引為從節點當前最大有效日誌序號。

程式碼@5:嘗試根據待提交序號從從節點查找數據,如果數據不存在,則拋出 DISK_ERROR 錯誤。

程式碼@6:更新 commitedIndex、committedPos 兩個指針,DledgerStore會定時將已提交指針刷入 checkpoint 文件,達到持久化 commitedIndex 指針的目的。

3.3.2 handleDoCompare

處理主節點發送過來的 COMPARE 請求,其實現也比較簡單,最終調用 buildResponse 方法構造響應結果。

EntryHandler#buildResponse

private PushEntryResponse buildResponse(PushEntryRequest request, int code) {      PushEntryResponse response = new PushEntryResponse();      response.setGroup(request.getGroup());      response.setCode(code);      response.setTerm(request.getTerm());      if (request.getType() != PushEntryRequest.Type.COMMIT) {          response.setIndex(request.getEntry().getIndex());      }      response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());      response.setEndIndex(dLedgerStore.getLedgerEndIndex());      return response;  }  

主要也是返回當前從幾點的 ledgerBeginIndex、ledgerEndIndex 以及投票輪次,供主節點進行判斷比較。

3.3.3 handleDoTruncate

handleDoTruncate 方法實現比較簡單,刪除從節點上 truncateIndex 日誌序號之後的所有日誌,具體調用dLedgerStore 的 truncate 方法,由於其存儲與 RocketMQ 的存儲設計基本類似故本文就不在詳細介紹,簡單介紹其實現要點:根據日誌序號,去定位到日誌文件,如果命中具體的文件,則修改相應的讀寫指針、刷盤指針等,並將所在在物理文件之後的所有文件刪除。大家如有興趣,可以查閱筆者的《RocketMQ技術內幕》第4章:RocketMQ 存儲相關內容。

3.3.4 handleDoAppend

private void handleDoAppend(long writeIndex, PushEntryRequest request,      CompletableFuture<PushEntryResponse> future) {      try {          PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);          DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());          PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);          future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));          dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());      } catch (Throwable t) {          logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);          future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));      }  }  

其實現也比較簡單,調用DLedgerStore 的 appendAsFollower 方法進行日誌的追加,與appendAsLeader 在日誌存儲部分相同,只是從節點無需再轉發日誌。

3.3.5 checkAbnormalFuture

該方法是本節的重點,doWork 的從伺服器存儲的最大有效日誌序號(ledgerEndIndex) + 1 序號,嘗試從待寫請求中獲取不到對應的請求時調用,這種情況也很常見,例如主節點並么有將最新的數據 PUSH 給從節點。接下來我們詳細來看看該方法的實現細節。

EntryHandler#checkAbnormalFuture

if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < ) {      return;  }  lastCheckFastForwardTimeMs  = System.currentTimeMillis();  if (writeRequestMap.isEmpty()) {      return;  }  

Step1:如果上一次檢查的時間距現在不到1s,則跳出;如果當前沒有積壓的append請求,同樣跳出,因為可以同樣明確的判斷出主節點還未推送日誌。

EntryHandler#checkAbnormalFuture

for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {      long index = pair.getKey().getEntry().getIndex();             // @1      //Fall behind      if (index <= endIndex) {                                                   // @2          try {              DLedgerEntry local = dLedgerStore.get(index);              PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);              pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));              logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);          } catch (Throwable t) {              logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);              pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));          }          writeRequestMap.remove(index);          continue;      }      //Just OK      if (index ==  endIndex + ) {    // @3          //The next entry is coming, just return          return;      }      //Fast forward      TimeoutFuture<PushEntryResponse> future  = (TimeoutFuture<PushEntryResponse>) pair.getValue();    // @4      if (!future.isTimeOut()) {          continue;      }      if (index < minFastForwardIndex) {                                                                                                                // @5          minFastForwardIndex = index;      }  }  

Step2:遍歷當前待寫入的日誌追加請求(主伺服器推送過來的日誌複製請求),找到需要快速快進的的索引。其關鍵實現點如下:

  • 程式碼@1:首先獲取待寫入日誌的序號。
  • 程式碼@2:如果待寫入的日誌序號小於從節點已追加的日誌(endIndex),並且日誌的確已存儲在從節點,則返回成功,並輸出警告日誌【PushFallBehind】,繼續監測下一條待寫入日誌。
  • 程式碼@3:如果待寫入 index 等於 endIndex + 1,則結束循環,因為下一條日誌消息已經在待寫入隊列中,即將寫入。
  • 程式碼@4:如果待寫入 index 大於 endIndex + 1,並且未超時,則直接檢查下一條待寫入日誌。
  • 程式碼@5:如果待寫入 index 大於 endIndex + 1,並且已經超時,則記錄該索引,使用 minFastForwardIndex 存儲。

EntryHandler#checkAbnormalFuture

if (minFastForwardIndex == Long.MAX_VALUE) {      return;  }  Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);  if (pair == null) {      return;  }  

Step3:如果未找到需要快速失敗的日誌序號或 writeRequestMap 中未找到其請求,則直接結束檢測。

EntryHandler#checkAbnormalFuture

logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);  pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));  

Step4:則向主節點報告從節點已經與主節點發生了數據不一致,從節點並沒有寫入序號 minFastForwardIndex 的日誌。如果主節點收到此種響應,將會停止日誌轉發,轉而向各個從節點發送 COMPARE 請求,從而使數據恢復一致。

行為至此,已經詳細介紹了主伺服器向從伺服器發送請求,從服務做出響應,那接下來就來看一下,服務端收到響應結果後的處理,我們要知道主節點會向它所有的從節點傳播日誌,主節點需要在指定時間內收到超過集群一半節點的確認,才能認為日誌寫入成功,那我們接下來看一下其實現過程。

4、QuorumAckChecker


日誌複製投票器,一個日誌寫請求只有得到集群內的的大多數節點的響應,日誌才會被提交。

4.1 類圖

其核心屬性如下:

  • long lastPrintWatermarkTimeMs 上次列印水位線的時間戳,單位為毫秒。
  • long lastCheckLeakTimeMs 上次檢測泄漏的時間戳,單位為毫秒。
  • long lastQuorumIndex 已投票仲裁的日誌序號。

4.2 doWork 詳解

QuorumAckChecker#doWork

if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > ) {      logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",              memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));      lastPrintWatermarkTimeMs = System.currentTimeMillis();  }  

Step1:如果離上一次列印 watermak 的時間超過3s,則列印一下當前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 這些數據日誌。

QuorumAckChecker#doWork

if (!memberState.isLeader()) {   // @2      waitForRunning();      return;  }  

Step2:如果當前節點不是主節點,直接返回,不作為。

QuorumAckChecker#doWork

if (pendingAppendResponsesByTerm.size() > ) {   // @1      for (Long term : pendingAppendResponsesByTerm.keySet()) {          if (term == currTerm) {              continue;          }          for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {              AppendEntryResponse response = new AppendEntryResponse();              response.setGroup(memberState.getGroup());              response.setIndex(futureEntry.getKey());              response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());              response.setLeaderId(memberState.getLeaderId());              logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);              futureEntry.getValue().complete(response);          }          pendingAppendResponsesByTerm.remove(term);      }  }  if (peerWaterMarksByTerm.size() > ) {      for (Long term : peerWaterMarksByTerm.keySet()) {          if (term == currTerm) {              continue;          }          logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);          peerWaterMarksByTerm.remove(term);      }  }  

Step3:清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票輪次的數據,避免一些不必要的記憶體使用。

Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);  long quorumIndex = -;  for (Long index : peerWaterMarks.values()) {  // @1      int num = ;      for (Long another : peerWaterMarks.values()) {  // @2          if (another >= index) {              num++;          }      }      if (memberState.isQuorum(num) && index > quorumIndex) {  // @3          quorumIndex = index;      }  }  dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);  // @4  

Step4:根據各個從節點回饋的進度,進行仲裁,確定已提交序號。為了加深對這段程式碼的理解,再來啰嗦一下 peerWaterMarks 的作用,存儲的是各個從節點當前已成功追加的日誌序號。例如一個三節點的 DLedger 集群,peerWaterMarks 數據存儲大概如下:

{  「dledger_group_01_0」 : ,  "dledger_group_01_1" : ,  }  

其中 dledger_group_01_0 為從節點1的ID,當前已複製的序號為 100,而 dledger_group_01_1 為節點2的ID,當前已複製的序號為 101。再加上主節點,如何確定可提交序號呢?

  • 程式碼@1:首先遍歷 peerWaterMarks 的 value 集合,即上述示例中的 {100, 101},用臨時變數 index 來表示待投票的日誌序號,需要集群內超過半數的節點的已複製序號超過該值,則該日誌能被確認提交。
  • 程式碼@2:遍歷 peerWaterMarks 中的所有已提交序號,與當前值進行比較,如果節點的已提交序號大於等於待投票的日誌序號(index),num 加一,表示投贊成票。
  • 程式碼@3:對 index 進行仲裁,如果超過半數 並且 index 大於 quorumIndex,更新 quorumIndex 的值為 index。quorumIndex 經過遍歷的,得出當前最大的可提交日誌序號。
  • 程式碼@4:更新 committedIndex 索引,方便 DLedgerStore 定時將 committedIndex 寫入 checkpoint 中。
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);  boolean needCheck = false;  int ackNum = ;  if (quorumIndex >= ) {      for (Long i = quorumIndex; i >= ; i--) {  // @1          try {              CompletableFuture<AppendEntryResponse> future = responses.remove(i);   // @2              if (future == null) {                                                                                              // @3                  needCheck = lastQuorumIndex != - && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;                  break;              } else if (!future.isDone()) {                                                                                // @4                  AppendEntryResponse response = new AppendEntryResponse();                  response.setGroup(memberState.getGroup());                  response.setTerm(currTerm);                  response.setIndex(i);                  response.setLeaderId(memberState.getSelfId());                  response.setPos(((AppendFuture) future).getPos());                  future.complete(response);              }              ackNum++;                                                                                                      // @5          } catch (Throwable t) {              logger.error("Error in ack to index={} term={}", i, currTerm, t);          }      }  }  

Step5:處理 quorumIndex 之前的掛起請求,需要發送響應到客戶端,其實現步驟:

  • 程式碼@1:從 quorumIndex 開始處理,沒處理一條,該序號減一,直到大於0或主動退出,請看後面的退出邏輯。
  • 程式碼@2:responses 中移除該日誌條目的掛起請求。
  • 程式碼@3:如果未找到掛起請求,說明前面掛起的請求已經全部處理完畢,準備退出,退出之前再 設置 needCheck 的值,其依據如下(三個條件必須同時滿足):
    • 最後一次仲裁的日誌序號不等於-1
    • 並且最後一次不等於本次新仲裁的日誌序號
    • 最後一次仲裁的日誌序號不等於最後一次仲裁的日誌。正常情況一下,條件一、條件二通常為true,但這一條大概率會返回false。
  • 程式碼@4:向客戶端返回結果。
  • 程式碼@5:ackNum,表示本次確認的數量。
if (ackNum == ) {      for (long i = quorumIndex + ; i < Integer.MAX_VALUE; i++) {          TimeoutFuture<AppendEntryResponse> future = responses.get(i);          if (future == null) {              break;          } else if (future.isTimeOut()) {              AppendEntryResponse response = new AppendEntryResponse();              response.setGroup(memberState.getGroup());              response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());              response.setTerm(currTerm);              response.setIndex(i);              response.setLeaderId(memberState.getSelfId());              future.complete(response);          } else {              break;          }      }      waitForRunning();  }  

Step6:如果本次確認的個數為0,則嘗試去判斷超過該仲裁序號的請求,是否已經超時,如果已超時,則返回超時響應結果。

if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) >  || needCheck) {      updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());      for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {          if (futureEntry.getKey() < quorumIndex) {              AppendEntryResponse response = new AppendEntryResponse();              response.setGroup(memberState.getGroup());              response.setTerm(currTerm);              response.setIndex(futureEntry.getKey());              response.setLeaderId(memberState.getSelfId());              response.setPos(((AppendFuture) futureEntry.getValue()).getPos());              futureEntry.getValue().complete(response);              responses.remove(futureEntry.getKey());          }      }      lastCheckLeakTimeMs = System.currentTimeMillis();  }  

Step7:檢查是否發送泄漏。其判斷泄漏的依據是如果掛起的請求的日誌序號小於已提交的序號,則移除。

Step8:一次日誌仲裁就結束了,最後更新 lastQuorumIndex 為本次仲裁的的新的提交值。