源碼分析 RocketMQ DLedger(多副本) 之日誌複製-上篇

  • 2019 年 10 月 6 日
  • 筆記

本文緊接著 源碼分析 RocketMQ DLedger(多副本) 之日誌追加流程 ,繼續 Leader 處理客戶端 append 的請求流程中最至關重要的一環:日誌複製。

溫馨提示:由於微信單篇文章的字數限制,RocketMQ DLedger 日誌複製分為兩篇文章介紹。

DLedger 多副本的日誌轉發由 DLedgerEntryPusher 實現,接下來將對其進行詳細介紹。

溫馨提示:由於本篇幅較長,為了更好的理解其實現,大家可以帶著如下疑問來通讀本篇文章: 1、raft 協議中有一個非常重要的概念:已提交日誌序號,該如何實現。 2、客戶端向 DLedger 集群發送一條日誌,必須得到集群中大多數節點的認可才能被認為寫入成功。 3、raft 協議中追加、提交兩個動作如何實現。

日誌複製(日誌轉發)由 DLedgerEntryPusher 實現,具體類圖如下:

主要由如下4個類構成:

  • DLedgerEntryPusher DLedger 日誌轉發與處理核心類,該內會啟動如下3個對象,其分別對應一個執行緒。
  • EntryHandler 日誌接收處理執行緒,當節點為從節點時激活。
  • QuorumAckChecker 日誌追加ACK投票處理執行緒,當前節點為主節點時激活。
  • EntryDispatcher 日誌轉發執行緒,當前節點為主節點時追加。

接下來我們將詳細介紹上述4個類,從而揭曉日誌複製的核心實現原理。

1、DLedgerEntryPusher


1.1 核心類圖

DLedger 多副本日誌推送的核心實現類,裡面會創建 EntryDispatcher、QuorumAckChecker、EntryHandler 三個核心執行緒。其核心屬性如下:

  • DLedgerConfig dLedgerConfig 多副本相關配置。
  • DLedgerStore dLedgerStore 存儲實現類。
  • MemberState memberState 節點狀態機。
  • DLedgerRpcService dLedgerRpcService RPC 服務實現類,用於集群內的其他節點進行網路通訊。
  • Map> peerWaterMarksByTerm 每個節點基於投票輪次的當前水位線標記。鍵值為投票輪次,值為 ConcurrentMap/, Long/* 節點對應的日誌序號*/>。
  • Map>> pendingAppendResponsesByTerm 用於存放追加請求的響應結果(Future模式)。
  • EntryHandler entryHandler 從節點上開啟的執行緒,用於接收主節點的 push 請求(append、commit、append)。
  • QuorumAckChecker quorumAckChecker 主節點上的追加請求投票器。
  • Map dispatcherMap 主節點日誌請求轉發器,向從節點複製消息等。

接下來介紹一下其核心方法的實現。

1.2 構造方法

public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,      DLedgerRpcService dLedgerRpcService) {      this.dLedgerConfig = dLedgerConfig;      this.memberState = memberState;      this.dLedgerStore = dLedgerStore;      this.dLedgerRpcService = dLedgerRpcService;      for (String peer : memberState.getPeerMap().keySet()) {          if (!peer.equals(memberState.getSelfId())) {              dispatcherMap.put(peer, new EntryDispatcher(peer, logger));          }      }  }  

構造方法的重點是會根據集群內的節點,依次構建對應的 EntryDispatcher 對象。

1.3 startup

DLedgerEntryPusher#startup

public void startup() {      entryHandler.start();      quorumAckChecker.start();      for (EntryDispatcher dispatcher : dispatcherMap.values()) {          dispatcher.start();      }  }  

依次啟動 EntryHandler、QuorumAckChecker 與 EntryDispatcher 執行緒。

備註:DLedgerEntryPusher 的其他核心方法在詳細分析其日誌複製原理的過程中會一一介紹。

接下來將從 EntryDispatcher、QuorumAckChecker、EntryHandler 來闡述 RocketMQ DLedger(多副本)的實現原理。

2、EntryDispatcher 詳解


2.1 核心類圖

其核心屬性如下。

  • AtomicReference type 向從節點發送命令的類型,可選值:PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT,下面詳細說明。
  • long lastPushCommitTimeMs = -1 上一次發送提交類型的時間戳。
  • String peerId 目標節點ID。
  • long compareIndex = -1 已完成比較的日誌序號。
  • long writeIndex = -1 已寫入的日誌序號。
  • int maxPendingSize = 1000 允許的最大掛起日誌數量。
  • long term = -1 Leader 節點當前的投票輪次。
  • String leaderId = null Leader 節點ID。
  • long lastCheckLeakTimeMs 上次檢測泄漏的時間,所謂的泄漏,就是看掛起的日誌請求數量是否查過了 maxPendingSize 。
  • ConcurrentMap pendingMap 記錄日誌的掛起時間,key:日誌的序列(entryIndex),value:掛起時間戳。
  • Quota quota = new Quota(dLedgerConfig.getPeerPushQuota()) 配額。

2.2 Push 請求類型

DLedger 主節點向從從節點複製日誌總共定義了4類請求類型,其枚舉類型為 PushEntryRequest.Type,其值分別為 COMPARE、TRUNCATE、APPEND、COMMIT。

  • COMPARE 如果 Leader 發生變化,新的 Leader 需要與他的從節點的日誌條目進行比較,以便截斷從節點多餘的數據。
  • TRUNCATE 如果 Leader 通過索引完成日誌對比,則 Leader 將發送 TRUNCATE 給它的從節點。
  • APPEND 將日誌條目追加到從節點。
  • COMMIT 通常,leader 會將提交的索引附加到 append 請求,但是如果 append 請求很少且分散,leader 將發送一個單獨的請求來通知從節點提交的索引。

對主從節點的請求類型有了一個初步的認識後,我們將從 EntryDispatcher 的業務處理入口 doWork 方法開始講解。

2.3 doWork 方法詳解

public void doWork() {      try {          if (!checkAndFreshState()) {                                            // @1              waitForRunning();              return;          }            if (type.get() == PushEntryRequest.Type.APPEND) {   // @2              doAppend();          } else {              doCompare();                                                           // @3          }          waitForRunning();      } catch (Throwable t) {          DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);          DLedgerUtils.sleep();      }  }  

程式碼@1:檢查狀態,是否可以繼續發送 append 或 compare。

程式碼@2:如果推送類型為APPEND,主節點向從節點傳播消息請求。

程式碼@3:主節點向從節點發送對比數據差異請求(當一個新節點被選舉成為主節點時,往往這是第一步)。

2.3.1 checkAndFreshState 詳解

EntryDispatcher#checkAndFreshState

private boolean checkAndFreshState() {      if (!memberState.isLeader()) {     // @1          return false;      }      if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {     // @2          synchronized (memberState) {              if (!memberState.isLeader()) {                  return false;              }              PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);              term = memberState.currTerm();              leaderId = memberState.getSelfId();              changeState(-, PushEntryRequest.Type.COMPARE);          }      }      return true;  }  

程式碼@1:如果節點的狀態不是主節點,則直接返回 false。則結束 本次 doWork 方法。因為只有主節點才需要向從節點轉發日誌。

程式碼@2:如果當前節點狀態是主節點,但當前的投票輪次與狀態機輪次或 leaderId 還未設置,或 leaderId 與狀態機的 leaderId 不相等,這種情況通常是集群觸發了重新選舉,設置其term、leaderId與狀態機同步,即將發送COMPARE 請求。

接下來看一下 changeState (改變狀態)。

private synchronized void changeState(long index, PushEntryRequest.Type target) {      logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);      switch (target) {          case APPEND:      // @1              compareIndex = -;              updatePeerWaterMark(term, peerId, index);              quorumAckChecker.wakeup();              writeIndex = index + ;              break;          case COMPARE:    // @2              if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {                  compareIndex = -;                  pendingMap.clear();              }              break;          case TRUNCATE:     // @3              compareIndex = -;              break;          default:              break;      }      type.set(target);  }  

程式碼@1:如果將目標類型設置為 append,則重置 compareIndex ,並設置 writeIndex 為當前 index 加1。

程式碼@2:如果將目標類型設置為 COMPARE,則重置 compareIndex 為負一,接下將向各個從節點發送 COMPARE 請求類似,並清除已掛起的請求。

程式碼@3:如果將目標類型設置為 TRUNCATE,則重置 compareIndex 為負一。

接下來具體來看一下 APPEND、COMPARE、TRUNCATE 等請求。

2.3.2 append 請求詳解

EntryDispatcher#doAppend

private void doAppend() throws Exception {      while (true) {          if (!checkAndFreshState()) {                                                 // @1              break;          }          if (type.get() != PushEntryRequest.Type.APPEND) {        // @2              break;          }          if (writeIndex > dLedgerStore.getLedgerEndIndex()) {    // @3              doCommit();              doCheckAppendResponse();              break;          }          if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > )) {     // @4              long peerWaterMark = getPeerWaterMark(term, peerId);              for (Long index : pendingMap.keySet()) {                  if (index < peerWaterMark) {                      pendingMap.remove(index);                  }              }              lastCheckLeakTimeMs = System.currentTimeMillis();          }          if (pendingMap.size() >= maxPendingSize) {    // @5              doCheckAppendResponse();              break;          }          doAppendInner(writeIndex);                               // @6          writeIndex++;      }  }  

程式碼@1:檢查狀態,已經在上面詳細介紹。

程式碼@2:如果請求類型不為 APPEND,則退出,結束本輪 doWork 方法執行。

程式碼@3:writeIndex 表示當前追加到從該節點的序號,通常情況下主節點向從節點發送 append 請求時,會附帶主節點的已提交指針,但如何 append 請求發不那麼頻繁,writeIndex 大於 leaderEndIndex 時(由於pending請求超過其 pending 請求的隊列長度(默認為1w),時,會阻止數據的追加,此時有可能出現 writeIndex 大於 leaderEndIndex 的情況,此時單獨發送 COMMIT 請求。

程式碼@4:檢測 pendingMap(掛起的請求數量)是否發送泄漏,即掛起隊列中容量是否超過允許的最大掛起閥值。獲取當前節點關於本輪次的當前水位線(已成功 append 請求的日誌序號),如果發現正在掛起請求的日誌序號小於水位線,則丟棄。

程式碼@5:如果掛起的請求(等待從節點追加結果)大於 maxPendingSize 時,檢查並追加一次 append 請求。

程式碼@6:具體的追加請求。

2.3.2.1 doCommit 發送提交請求

EntryDispatcher#doCommit

private void doCommit() throws Exception {      if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > ) {   // @1          PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);   // @2          //Ignore the results          dLedgerRpcService.push(request);                                                                                        // @3          lastPushCommitTimeMs = System.currentTimeMillis();      }  }  

程式碼@1:如果上一次單獨發送 commit 的請求時間與當前時間相隔低於 1s,放棄本次提交請求。

程式碼@2:構建提交請求。

程式碼@3:通過網路向從節點發送 commit 請求。

接下來先了解一下如何構建 commit 請求包。

EntryDispatcher#buildPushRequest

private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {      PushEntryRequest request = new PushEntryRequest();      request.setGroup(memberState.getGroup());      request.setRemoteId(peerId);      request.setLeaderId(leaderId);      request.setTerm(term);      request.setEntry(entry);      request.setType(target);      request.setCommitIndex(dLedgerStore.getCommittedIndex());      return request;  }  

提交包請求欄位主要包含如下欄位:DLedger 節點所屬組、從節點 id、主節點 id,當前投票輪次、日誌內容、請求類型與 committedIndex(主節點已提交日誌序號)。

2.3.2.2 doCheckAppendResponse 檢查並追加請求

EntryDispatcher#doCheckAppendResponse

private void doCheckAppendResponse() throws Exception {      long peerWaterMark = getPeerWaterMark(term, peerId);   // @1      Long sendTimeMs = pendingMap.get(peerWaterMark + );      if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) { // @2          logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + );          doAppendInner(peerWaterMark + );      }  }  

該方法的作用是檢查 append 請求是否超時,其關鍵實現如下:

  • 獲取已成功 append 的序號。
  • 從掛起的請求隊列中獲取下一條的發送時間,如果不為空並去超過了 append 的超時時間,則再重新發送 append 請求,最大超時時間默認為 1s,可以通過 maxPushTimeOutMs 來改變默認值。
2.3.2.3 doAppendInner 追加請求

向從節點發送 append 請求。

EntryDispatcher#doAppendInner

private void doAppendInner(long index) throws Exception {      DLedgerEntry entry = dLedgerStore.get(index);   // @1      PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);      checkQuotaAndWait(entry);                                   // @2      PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   // @3      CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);   // @4      pendingMap.put(index, System.currentTimeMillis());                                                                          // @5      responseFuture.whenComplete((x, ex) -> {          try {              PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);              DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());              switch (responseCode) {                  case SUCCESS:                                                                                                                // @6                      pendingMap.remove(x.getIndex());                      updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());                      quorumAckChecker.wakeup();                      break;                  case INCONSISTENT_STATE:                                                                                         // @7                      logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());                      changeState(-, PushEntryRequest.Type.COMPARE);                      break;                  default:                      logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());                      break;              }          } catch (Throwable t) {              logger.error("", t);          }      });      lastPushCommitTimeMs = System.currentTimeMillis();  }  

程式碼@1:首先根據序號查詢出日誌。

程式碼@2:檢測配額,如果超過配額,會進行一定的限流,其關鍵實現點:

  • 首先觸發條件:append 掛起請求數已超過最大允許掛起數;基於文件存儲並主從差異超過300m,可通過 peerPushThrottlePoint 配置。
  • 每秒追加的日誌超過 20m(可通過 peerPushQuota 配置),則會 sleep 1s中後再追加。

程式碼@3:構建 PUSH 請求日誌。

程式碼@4:通過 Netty 發送網路請求到從節點,從節點收到請求會進行處理(本文並不會探討與網路相關的實現細節)。

程式碼@5:用 pendingMap 記錄待追加的日誌的發送時間,用於發送端判斷是否超時的一個依據。

程式碼@6:請求成功的處理邏輯,其關鍵實現點如下:

  • 移除 pendingMap 中的關於該日誌的發送超時時間。
  • 更新已成功追加的日誌序號(按投票輪次組織,並且每個從伺服器一個鍵值對)。
  • 喚醒 quorumAckChecker 執行緒(主要用於仲裁 append 結果),後續會詳細介紹。

程式碼@7:Push 請求出現狀態不一致情況,將發送 COMPARE 請求,來對比主從節點的數據是否一致。

日誌轉發 append 追加請求類型就介紹到這裡了,接下來我們繼續探討另一個請求類型 compare。

2.3.3 compare 請求詳解

COMPARE 類型的請求有 doCompare 方法發送,首先該方法運行在 while (true) 中,故在查閱下面程式碼時,要注意其退出循環的條件。 EntryDispatcher#doCompare

if (!checkAndFreshState()) {      break;  }  if (type.get() != PushEntryRequest.Type.COMPARE      && type.get() != PushEntryRequest.Type.TRUNCATE) {      break;  }  if (compareIndex == - && dLedgerStore.getLedgerEndIndex() == -) {      break;  }  

Step1:驗證是否執行,有幾個關鍵點如下:

  • 判斷是否是主節點,如果不是主節點,則直接跳出。
  • 如果是請求類型不是 COMPARE 或 TRUNCATE 請求,則直接跳出。
  • 如果已比較索引 和 ledgerEndIndex 都為 -1 ,表示一個新的 DLedger 集群,則直接跳出。

EntryDispatcher#doCompare

if (compareIndex == -) {      compareIndex = dLedgerStore.getLedgerEndIndex();      logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);  } else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {      logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());      compareIndex = dLedgerStore.getLedgerEndIndex();  }  

Step2:如果 compareIndex 為 -1 或compareIndex 不在有效範圍內,則重置待比較序列號為當前已已存儲的最大日誌序號:ledgerEndIndex。

DLedgerEntry entry = dLedgerStore.get(compareIndex);  PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);  PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);  CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);  PushEntryResponse response = responseFuture.get(, TimeUnit.SECONDS);  

Step3:根據序號查詢到日誌,並向從節點發起 COMPARE 請求,其超時時間為 3s。

EntryDispatcher#doCompare

long truncateIndex = -;  if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {   // @1      if (compareIndex == response.getEndIndex()) {          changeState(compareIndex, PushEntryRequest.Type.APPEND);          break;      } else {          truncateIndex = compareIndex;      }    } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()          || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {    // @2      truncateIndex = dLedgerStore.getLedgerBeginIndex();  } else if (compareIndex < response.getBeginIndex()) {                                    // @3      truncateIndex = dLedgerStore.getLedgerBeginIndex();  } else if (compareIndex > response.getEndIndex()) {                                      // @4      compareIndex = response.getEndIndex();  } else {                                                                                                              // @5      compareIndex--;  }    if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {                          // @6      truncateIndex = dLedgerStore.getLedgerBeginIndex();  }  

Step4:根據響應結果計算需要截斷的日誌序號,其主要實現關鍵點如下:

  • 程式碼@1:如果兩者的日誌序號相同,則無需截斷,下次將直接先從節點發送 append 請求;否則將 truncateIndex 設置為響應結果中的 endIndex。
  • 程式碼@2:如果從節點存儲的最大日誌序號小於主節點的最小序號,或者從節點的最小日誌序號大於主節點的最大日誌序號,即兩者不相交,這通常發生在從節點崩潰很長一段時間,而主節點刪除了過期的條目時。truncateIndex 設置為主節點的 ledgerBeginIndex,即主節點目前最小的偏移量。
  • 程式碼@3:如果已比較的日誌序號小於從節點的開始日誌序號,很可能是從節點磁碟發送損耗,從主節點最小日誌序號開始同步。
  • 程式碼@4:如果已比較的日誌序號大於從節點的最大日誌序號,則已比較索引設置為從節點最大的日誌序號,觸發數據的繼續同步。
  • 程式碼@5:如果已比較的日誌序號大於從節點的開始日誌序號,但小於從節點的最大日誌序號,則待比較索引減一。
  • 程式碼@6:如果比較出來的日誌序號小於主節點的最小日誌需要,則設置為主節點的最小序號。
if (truncateIndex != -) {      changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);      doTruncate(truncateIndex);      break;  }  

Step5:如果比較出來的日誌序號不等於 -1 ,則向從節點發送 TRUNCATE 請求。

2.3.3.1 doTruncate 詳解
private void doTruncate(long truncateIndex) throws Exception {      PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);      DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);      PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);      logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());      PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);      PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(, TimeUnit.SECONDS);      PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);      PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);      lastPushCommitTimeMs = System.currentTimeMillis();      changeState(truncateIndex, PushEntryRequest.Type.APPEND);  }  

該方法主要就是構建 truncate 請求到從節點。

關於服務端的消息複製轉發就介紹到這裡了,主節點負責向從伺服器PUSH請求,從節點自然而然的要處理這些請求,接下來我們就按照主節點發送的請求,來具體分析一下從節點是如何響應的。

由於微信單篇文章字數的限制,從伺服器接收到主節點的 PUSH 請求後如何處理、以及主服務根據所有從伺服器的響應後進行仲裁(需要集群內半數以上節點追加成功後才認為是有效數據)等實現細節,則在下一篇文章中給出。