5. SOFAJRaft源碼分析— RheaKV中如何存放數據?

  • 2019 年 11 月 10 日
  • 筆記

概述

上一篇講了RheaKV是如何進行初始化的,因為RheaKV主要是用來做KV存儲的,RheaKV讀寫的是相當的複雜,一起寫會篇幅太長,所以這一篇主要來講一下RheaKV中如何存放數據。

我們這裡使用一個客戶端的例子來開始本次的講解:

public static void main(final String[] args) throws Exception {      final Client client = new Client();      client.init();      //get(client.getRheaKVStore());      RheaKVStore rheaKVStore = client.getRheaKVStore();      final byte[] key = writeUtf8("hello");      final byte[] value = writeUtf8("world");      rheaKVStore.bPut(key, value);      client.shutdown();  }

我們從這個main方法中啟動我們的實例,調用rheaKVStore.bPut(key, value)方法將數據放入到RheaKV中。

public class Client {        private final RheaKVStore rheaKVStore = new DefaultRheaKVStore();        public void init() {          final List<RegionRouteTableOptions> regionRouteTableOptionsList = MultiRegionRouteTableOptionsConfigured              .newConfigured() //              .withInitialServerList(-1L /* default id */, Configs.ALL_NODE_ADDRESSES) //              .config();          final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() //              .withFake(true) //              .withRegionRouteTableOptionsList(regionRouteTableOptionsList) //              .config();          final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //              .withClusterName(Configs.CLUSTER_NAME) //              .withPlacementDriverOptions(pdOpts) //              .config();          System.out.println(opts);          rheaKVStore.init(opts);      }        public void shutdown() {          this.rheaKVStore.shutdown();      }        public RheaKVStore getRheaKVStore() {          return rheaKVStore;      }  }    public class Configs {      public static String ALL_NODE_ADDRESSES = "127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183";        public static String CLUSTER_NAME       = "rhea_example";  }

Client在調用init方法初始化rheaKVStore的時候和我們上一節中講的server例子很像,區別是少了StoreEngineOptions的設置和多配置了一個regionRouteTableOptionsList實例。

bPut存入數據

我們這裡存入數據會調用DefaultRheaKVStore的bPut方法:
DefaultRheaKVStore#bPut

public Boolean bPut(final byte[] key, final byte[] value) {      return FutureHelper.get(put(key, value), this.futureTimeoutMillis);  }

bPut方法裏面主要的存放數據的操作在put方法裏面做的,put方法會返回一個CompletableFuture給FutureHelper的get方法調用,並且在bPut方法裏面會放入一個超時時間,在init方法中初始化的,默認是5秒。

接下來我們進入到put方法中:
DefaultRheaKVStore#put

public CompletableFuture<Boolean> put(final byte[] key, final byte[] value) {      Requires.requireNonNull(key, "key");      Requires.requireNonNull(value, "value");      //是否嘗試進行批量的put      return put(key, value, new CompletableFuture<>(), true);  }

這裡會調用put的重載的方法,第三個參數是表示傳入一個空的回調函數,第四個參數表示採用Batch 批量存儲
DefaultRheaKVStore#put

private CompletableFuture<Boolean> put(final byte[] key, final byte[] value,                                         final CompletableFuture<Boolean> future, final boolean tryBatching) {      //校驗一下是否已經init初始化了      checkState();      if (tryBatching) {          //putBatching實例在init方法中被初始化          final PutBatching putBatching = this.putBatching;          if (putBatching != null && putBatching.apply(new KVEntry(key, value), future)) {              //由於我們傳入的是一個空的實例,所以這裡直接返回              return future;          }      }      //直接存入數據      internalPut(key, value, future, this.failoverRetries, null);      return future;  }

checkState方法會去校驗started這個屬性有沒有被設置,如果調用過DefaultRheaKVStore的init方法進行初始化過,那麼會設置started為ture。
這裡還會調用init方法裏面初始化過的putBatching實例,我們下面看看putBatching實例做了什麼。

putBatching批量存入數據

putBatching在init實例初始化的時候會傳入一個PutBatchingHandler作為處理器:

this.putBatching = new PutBatching(KVEvent::new, "put_batching",          new PutBatchingHandler("put"));

我們下面看看PutBatching的構造方法:

public PutBatching(EventFactory<KVEvent> factory, String name, PutBatchingHandler handler) {      super(factory, batchingOpts.getBufSize(), name, handler);  }

這裡由於PutBatching繼承了Batching這個抽象類,所以在實例化的時候直接調用父類的構造器實例化:

public Batching(EventFactory<T> factory, int bufSize, String name, EventHandler<T> handler) {      this.name = name;      this.disruptor = new Disruptor<>(factory, bufSize, new NamedThreadFactory(name, true));      this.disruptor.handleEventsWith(handler);      this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(name));      this.ringBuffer = this.disruptor.start();  }

在Batching構造器裏面會初始化一個Disruptor實例,並將我們傳入的PutBatchingHandler處理器作為Disruptor的處理器,所有傳入PutBatching的數據都會經過PutBatchingHandler來處理。

我們下面看看PutBatchingHandler是怎麼處理數據的:
PutBatchingHandler#onEvent

public void onEvent(final KVEvent event, final long sequence, final boolean endOfBatch) throws Exception {      //1.把傳入的時間加入到集合中      this.events.add(event);      //加上key和value的長度      this.cachedBytes += event.kvEntry.length();      final int size = this.events.size();      //BatchSize等於100 ,並且maxWriteBytes位元組數32768      //2. 如果不是最後一個event,也沒有這麼多數量的數據,那麼就不發送      if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxWriteBytes()) {          return;      }      //3.如果傳入的size為1,那麼就重新調用put方法放入到Batching裏面      if (size == 1) {          //重置events和cachedBytes          reset();          final KVEntry kv = event.kvEntry;          try {              put(kv.getKey(), kv.getValue(), event.future, false);          } catch (final Throwable t) {              exceptionally(t, event.future);          }      //    4.如果size不為1,那麼把數據遍歷到集合裏面批量處理      } else {          //初始化一個長度為size的list          final List<KVEntry> entries = Lists.newArrayListWithCapacity(size);          final CompletableFuture<Boolean>[] futures = new CompletableFuture[size];          for (int i = 0; i < size; i++) {              final KVEvent e = this.events.get(i);              entries.add(e.kvEntry);              //使用CompletableFuture構建異步應用              futures[i] = e.future;          }          //遍歷完events數據到entries之後,重置          reset();          try {              //當put方法完成後執行whenComplete中的內容              put(entries).whenComplete((result, throwable) -> {                  //如果沒有拋出異常,那麼通知所有future已經執行完畢了                  if (throwable == null) {                      for (int i = 0; i < futures.length; i++) {                          futures[i].complete(result);                      }                      return;                  }                  exceptionally(throwable, futures);              });          } catch (final Throwable t) {              exceptionally(t, futures);          }      }  } 
  1. 進入這個方法的時候會把這個event加入到events集合中,然後把匯總長度和events的size
  2. 由於所有的event都是發往Disruptor,然後分發到PutBatchingHandler進行處理,所以可以通過endOfBatch參數判斷這個分發過來的event是不是最後一個,如果不是最後一個,並且總共的event數量沒有超過默認的100,cachedBytes沒有超過32768,那麼就直接返回,等湊夠了批次再處理
  3. 走到這個判斷,說明只有一條數據過來,那麼就重新調用put方法,設置tryBatching為false,那麼會直接走internalPut方法
  4. 如果size不等於1,那麼就會把所有的event都加入到集合裏面,然後調用put方法批量處理,當處理完之後調用whenComplete方法對返回的結果進行一場或回調處理

往RheaKV中批量put設值

下面我來講一下PutBatchingHandler#onEvent中的put(entries)這個方法是怎麼處理批量數據的,這個方法會調用到DefaultRheaKVStore的put方法。

DefaultRheaKVStore#put

public CompletableFuture<Boolean> put(final List<KVEntry> entries) {      //檢查狀態      checkState();      Requires.requireNonNull(entries, "entries");      Requires.requireTrue(!entries.isEmpty(), "entries empty");      //存放數據      final FutureGroup<Boolean> futureGroup = internalPut(entries, this.failoverRetries, null);      //處理返回狀態      return FutureHelper.joinBooleans(futureGroup);  }

該方法會調用internalPut進行設值操作。

DefaultRheaKVStore#internalPut

private FutureGroup<Boolean> internalPut(final List<KVEntry> entries, final int retriesLeft,                                           final Throwable lastCause) {      //組裝Region和KVEntry的映射關係      final Map<Region, List<KVEntry>> regionMap = this.pdClient              .findRegionsByKvEntries(entries, ApiExceptionHelper.isInvalidEpoch(lastCause));      final List<CompletableFuture<Boolean>> futures = Lists.newArrayListWithCapacity(regionMap.size());      final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);      for (final Map.Entry<Region, List<KVEntry>> entry : regionMap.entrySet()) {          final Region region = entry.getKey();          final List<KVEntry> subEntries = entry.getValue();          //設置重試回調函數,並將重試次數減一          final RetryCallable<Boolean> retryCallable = retryCause -> internalPut(subEntries, retriesLeft - 1,                  retryCause);          final BoolFailoverFuture future = new BoolFailoverFuture(retriesLeft, retryCallable);          //把數據存放到region中          internalRegionPut(region, subEntries, future, retriesLeft, lastError);          futures.add(future);      }      return new FutureGroup<>(futures);  }

因為一個Store裏面會有很多的Region,所以這個方法首先會去組裝Region和KVEntry的關係,確定這個KVEntry是屬於哪個Region的。
然後設置好回調函數後調用internalRegionPut方法將subEntries存入到Region中。

組裝Region和KVEntry的映射關係

我們下面看看是怎麼組裝的:
pdClient是FakePlacementDriverClient的實例,繼承了AbstractPlacementDriverClient,所以調用的是父類的findRegionsByKvEntries方法
AbstractPlacementDriverClient#findRegionsByKvEntries

public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries, final boolean forceRefresh) {      if (forceRefresh) {          refreshRouteTable();      }      //regionRouteTable裏面存了region的路由信息      return this.regionRouteTable.findRegionsByKvEntries(kvEntries);  }

因為我們這裡是用的FakePlacementDriverClient,所以refreshRouteTable返回的是一個空方法,所以往下走是調用RegionRouteTable的findRegionsByKvEntries的方法
RegionRouteTable#findRegionsByKvEntries

public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries) {      Requires.requireNonNull(kvEntries, "kvEntries");      //實例化一個map      final Map<Region, List<KVEntry>> regionMap = Maps.newHashMap();      final StampedLock stampedLock = this.stampedLock;      final long stamp = stampedLock.readLock();      try {          for (final KVEntry kvEntry : kvEntries) {              //根據kvEntry的key去找和region的startKey最接近的region              final Region region = findRegionByKeyWithoutLock(kvEntry.getKey());              //設置region和KVEntry的映射關係              regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(kvEntry);          }          return regionMap;      } finally {          stampedLock.unlockRead(stamp);      }  }    private Region findRegionByKeyWithoutLock(final byte[] key) {      // return the greatest key less than or equal to the given key      //rangeTable裏面存的是region的startKey,value是regionId      // 這裡返回小於等於key的第一個元素      final Map.Entry<byte[], Long> entry = this.rangeTable.floorEntry(key);      if (entry == null) {          reportFail(key);          throw reject(key, "fail to find region by key");      }      //regionTable裏面存的regionId,value是region      return this.regionTable.get(entry.getValue());  }

findRegionsByKvEntries方法會遍歷所有的KVEntry集合,然後調用findRegionByKeyWithoutLock去rangeTable裏面找合適的region,由於rangeTable是一個treemap,所以調用了floorEntry返回的是小於等於key的第一個region。
然後將region放入到regionMap里,key是regionMap,value是一個KVEntry集合。

regionRouteTable裏面的數據是在DefaultRheaKVStore初始化的時候傳入的,不記得的同學我給出了初始化路由表的過程:

DefaultRheaKVStore#init->FakePlacementDriverClient#init->  AbstractPlacementDriverClient#init->AbstractPlacementDriverClient#initRouteTableByRegion->regionRouteTable#addOrUpdateRegion
數據存放到相應的region中

我們接着DefaultRheaKVStore的internalPut的方法往下看到internalRegionPut方法,這個方法是真正存儲數據的地方:

DefaultRheaKVStore#internalRegionPut

private void internalRegionPut(final Region region, final List<KVEntry> subEntries,                                 final CompletableFuture<Boolean> future, final int retriesLeft,                                 final Errors lastCause) {      //獲取regionEngine      final RegionEngine regionEngine = getRegionEngine(region.getId(), true);      //重試函數,會回調當前的方法      final RetryRunner retryRunner = retryCause -> internalRegionPut(region, subEntries, future,              retriesLeft - 1, retryCause);      final FailoverClosure<Boolean> closure = new FailoverClosureImpl<>(future, false, retriesLeft,              retryRunner);      if (regionEngine != null) {          if (ensureOnValidEpoch(region, regionEngine, closure)) {              //獲取MetricsRawKVStore              final RawKVStore rawKVStore = getRawKVStore(regionEngine);              //在init方法中根據useParallelKVExecutor屬性決定是不是空              if (this.kvDispatcher == null) {                  //調用RockDB的api進行插入                  rawKVStore.put(subEntries, closure);              } else {                  //把put操作分發到kvDispatcher中異步執行                  this.kvDispatcher.execute(() -> rawKVStore.put(subEntries, closure));              }          }      } else {          //如果當前節點不是leader,那麼則返回的regionEngine為null          //那麼發起rpc調用到leader節點中          final BatchPutRequest request = new BatchPutRequest();          request.setKvEntries(subEntries);          request.setRegionId(region.getId());          request.setRegionEpoch(region.getRegionEpoch());          this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause);      }  }

這個方法首先調用getRegionEngine獲取regionEngine,因為我們這裡是client節點,沒有初始化RegionEngine,所以這裡獲取的為空,會直接通過rpc請求發送,然後交由KVCommandProcessor進行處理。
如果當前的節點是server,並且該RegionEngine是leader,那麼會調用rawKVStore然後調用put方法插入到RockDB中。

我們最後再看看rheaKVRpcService發送的rpc請求是怎麼被處理的。

向服務端發送BatchPutRequest請求插入數據

向服務端發送put請求是通過調用DefaultRheaKVRpcService的callAsyncWithRpc方法發起的:
DefaultRheaKVRpcService#callAsyncWithRpc

public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,                                                   final Errors lastCause) {      return callAsyncWithRpc(request, closure, lastCause, true);  }    public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,                                                   final Errors lastCause, final boolean requireLeader) {      final boolean forceRefresh = ErrorsHelper.isInvalidPeer(lastCause);      //獲取leader的endpoint      final Endpoint endpoint = getRpcEndpoint(request.getRegionId(), forceRefresh, this.rpcTimeoutMillis,              requireLeader);      //發起rpc調用      internalCallAsyncWithRpc(endpoint, request, closure);      return closure.future();  }

在這個方法里會調用getRpcEndpoint方法來獲取region所對應server的endpoint,然後對這個節點調用rpc請求。調用rpc請求都是sofa的bolt框架進行調用的,所以下面我們重點看怎麼獲取endpoint

DefaultRheaKVRpcService#getRpcEndpoint

public Endpoint getRpcEndpoint(final long regionId, final boolean forceRefresh, final long timeoutMillis,                                 final boolean requireLeader) {      if (requireLeader) {          //獲取leader          return getLeader(regionId, forceRefresh, timeoutMillis);      } else {          //輪詢獲取一個不是自己的節點          return getLuckyPeer(regionId, forceRefresh, timeoutMillis);      }  }

這裡有兩個分支,一個是獲取leader節點,一個是輪詢獲取節點。由於這兩個方法挺有意思的,所以我們下面兩個方法都講一下

根據regionId獲取leader節點

根據regionId獲取leader節點是由getLeader方法觸發的,在我們調用DefaultRheaKVStore的init方法實例化DefaultRheaKVRpcService的時候會重寫getLeader方法:
DefaultRheaKVStore#init

this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {        @Override      public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {          final Endpoint leader = getLeaderByRegionEngine(regionId);          if (leader != null) {              return leader;          }          return super.getLeader(regionId, forceRefresh, timeoutMillis);      }  };

重寫的getLeader方法會調用getLeaderByRegionEngine方法區根據regionId找Endpoint,如果找不到,那麼會調用父類的getLeader方法。

DefaultRheaKVStore#getLeaderByRegionEngine

private Endpoint getLeaderByRegionEngine(final long regionId) {      final RegionEngine regionEngine = getRegionEngine(regionId);      if (regionEngine != null) {          final PeerId leader = regionEngine.getLeaderId();          if (leader != null) {              final String raftGroupId = JRaftHelper.getJRaftGroupId(this.pdClient.getClusterName(), regionId);              RouteTable.getInstance().updateLeader(raftGroupId, leader);              return leader.getEndpoint();          }      }      return null;  }

這個方法這裡會獲取RegionEngine,但是我們這裡是client節點,是沒有初始化RegionEngine的,所以這裡就會返回null,接着返回到上一級中調用父類的getLeader方法。

DefaultRheaKVRpcService#getLeader

public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {      return this.pdClient.getLeader(regionId, forceRefresh, timeoutMillis);  }

這裡會調用pdClient的getLeader方法,這裡我們傳入的pdClient是FakePlacementDriverClient,它繼承了AbstractPlacementDriverClient,所以會調用到父類的getLeader方法中。

AbstractPlacementDriverClient#getLeader

public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {      //這裡會根據clusterName和regionId拼接出raftGroupId      final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);      //去路由表裡找這個集群的leader      PeerId leader = getLeader(raftGroupId, forceRefresh, timeoutMillis);      if (leader == null && !forceRefresh) {          // Could not found leader from cache, try again and force refresh cache          // 如果第一次沒有找到,那麼執行強制刷新的方法再找一次          leader = getLeader(raftGroupId, true, timeoutMillis);      }      if (leader == null) {          throw new RouteTableException("no leader in group: " + raftGroupId);      }      return leader.getEndpoint();  }

這個方法裏面會根據clusterName和regionId拼接raftGroupId,如果傳入的clusterName為demo,regionId為1,那麼拼接出來的raftGroupId就是:demo--1
然後會去調用getLeader獲取leader的PeerId,第一次調用這個方法傳入的forceRefresh為false,表示不用刷新,如果返回的為null,那麼會執行強制刷新再去找一次。

AbstractPlacementDriverClient#getLeader

protected PeerId getLeader(final String raftGroupId, final boolean forceRefresh, final long timeoutMillis) {      final RouteTable routeTable = RouteTable.getInstance();      //是否要強制刷新路由表      if (forceRefresh) {          final long deadline = System.currentTimeMillis() + timeoutMillis;          final StringBuilder error = new StringBuilder();          // A newly launched raft group may not have been successful in the election,          // or in the 'leader-transfer' state, it needs to be re-tried          Throwable lastCause = null;          for (;;) {              try {                  //刷新節點路由表                  final Status st = routeTable.refreshLeader(this.cliClientService, raftGroupId, 2000);                  if (st.isOk()) {                      break;                  }                  error.append(st.toString());              } catch (final InterruptedException e) {                  ThrowUtil.throwException(e);              } catch (final Throwable t) {                  lastCause = t;                  error.append(t.getMessage());              }              //如果還沒有到截止時間,那麼sleep10毫秒之後再刷新              if (System.currentTimeMillis() < deadline) {                  LOG.debug("Fail to find leader, retry again, {}.", error);                  error.append(", ");                  try {                      Thread.sleep(10);                  } catch (final InterruptedException e) {                      ThrowUtil.throwException(e);                  }              //    到了截止時間,那麼拋出異常              } else {                  throw lastCause != null ? new RouteTableException(error.toString(), lastCause)                      : new RouteTableException(error.toString());              }          }      }      //返迴路由表裏面的leader      return routeTable.selectLeader(raftGroupId);  }

如果要執行強制刷新,那麼會計算一下超時時間,然後調用死循環,在循環體裏面會去刷新路由表,如果沒有刷新成功也沒有超時,那麼會sleep10毫秒重新再刷。

RouteTable#refreshLeader

public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)                                                                                                                 throws InterruptedException,                                                                                                                 TimeoutException {      Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");      Requires.requireTrue(timeoutMs > 0, "Invalid timeout: " + timeoutMs);      //根據集群的id去獲取集群的配置信息,裏面包括集群的ip和端口號      final Configuration conf = getConfiguration(groupId);      if (conf == null) {          return new Status(RaftError.ENOENT,              "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);      }      final Status st = Status.OK();      final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder();      rb.setGroupId(groupId);      //發送獲取leader節點的請求      final CliRequests.GetLeaderRequest request = rb.build();      TimeoutException timeoutException = null;      for (final PeerId peer : conf) {          //如果連接不上,先設置狀態為error,然後continue          if (!cliClientService.connect(peer.getEndpoint())) {              if (st.isOk()) {                  st.setError(-1, "Fail to init channel to %s", peer);              } else {                  final String savedMsg = st.getErrorMsg();                  st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer);              }              continue;          }          //向這個節點發送獲取leader的GetLeaderRequest請求          final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);          try {              final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);              //異常情況的處理              if (msg instanceof RpcRequests.ErrorResponse) {                  if (st.isOk()) {                      st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg());                  } else {                      final String savedMsg = st.getErrorMsg();                      st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg());                  }              } else {                  final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg;                  //重置leader                  updateLeader(groupId, response.getLeaderId());                  return Status.OK();              }          } catch (final TimeoutException e) {              timeoutException = e;          } catch (final ExecutionException e) {              if (st.isOk()) {                  st.setError(-1, e.getMessage());              } else {                  final String savedMsg = st.getErrorMsg();                  st.setError(-1, "%s, %s", savedMsg, e.getMessage());              }          }      }      if (timeoutException != null) {          throw timeoutException;      }        return st;  }

大家不要一開始就被這樣的長的方法給迷惑住了,這個方法實際上非常的簡單:

  1. 根據groupId獲取集群節點的配置信息,其中包括了其他節點的ip和端口號
  2. 遍歷conf裏面的集群節點
  3. 嘗試連接被遍歷的節點,如果連接不上直接continue換到下一個節點
  4. 向這個節點發送GetLeaderRequest請求,如果在超時時間內可以返回正常的響應,那麼就調用updateLeader更新leader信息

updateLeader方法相當節點,裏面就是更新一下路由表的leader屬性,我們這裡看看server是怎麼處理GetLeaderRequest請求的

GetLeaderRequest由GetLeaderRequestProcessor處理器來進行處理。
GetLeaderRequestProcessor#processRequest

public Message processRequest(GetLeaderRequest request, RpcRequestClosure done) {      List<Node> nodes = new ArrayList<>();      String groupId = getGroupId(request);      //如果請求是指定某個PeerId      //那麼則則去集群里找到指定Peer所對應的node      if (request.hasPeerId()) {          String peerIdStr = getPeerId(request);          PeerId peer = new PeerId();          if (peer.parse(peerIdStr)) {              Status st = new Status();              nodes.add(getNode(groupId, peer, st));              if (!st.isOk()) {                  return RpcResponseFactory.newResponse(st);              }          } else {              return RpcResponseFactory.newResponse(RaftError.EINVAL, "Fail to parse peer id %", peerIdStr);          }      } else {          //獲取集群所有的節點          nodes = NodeManager.getInstance().getNodesByGroupId(groupId);      }      if (nodes == null || nodes.isEmpty()) {          return RpcResponseFactory.newResponse(RaftError.ENOENT, "No nodes in group %s", groupId);      }      //遍歷集群node,獲取leaderId      for (Node node : nodes) {          PeerId leader = node.getLeaderId();          if (leader != null && !leader.isEmpty()) {              return GetLeaderResponse.newBuilder().setLeaderId(leader.toString()).build();          }      }      return RpcResponseFactory.newResponse(RaftError.EAGAIN, "Unknown leader");  }

這裡由於我們穿過來的request並沒有攜帶PeerId,所以不會去獲取指定的peer對應node節點的leaderId,而是會去找到集群groupId對應的所有節點,然後遍歷節點找到對應的leaderId。

getLuckyPeer輪詢獲取一個節點

在上面我們講完了getLeader是怎麼實現的,下面我們講一下getLuckyPeer這個方法裏面是怎麼操作的。

public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis) {      return this.pdClient.getLuckyPeer(regionId, forceRefresh, timeoutMillis, this.selfEndpoint);  }

這裡和getLeader方法一樣會調用到AbstractPlacementDriverClient的getLuckyPeer方法中
AbstractPlacementDriverClient#getLuckyPeer

public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis,                               final Endpoint unExpect) {      final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);      final RouteTable routeTable = RouteTable.getInstance();      //是否要強制刷新一下最新的集群節點信息      if (forceRefresh) {          final long deadline = System.currentTimeMillis() + timeoutMillis;          final StringBuilder error = new StringBuilder();          // A newly launched raft group may not have been successful in the election,          // or in the 'leader-transfer' state, it needs to be re-tried          for (;;) {              try {                  final Status st = routeTable.refreshConfiguration(this.cliClientService, raftGroupId, 5000);                  if (st.isOk()) {                      break;                  }                  error.append(st.toString());              } catch (final InterruptedException e) {                  ThrowUtil.throwException(e);              } catch (final TimeoutException e) {                  error.append(e.getMessage());              }              if (System.currentTimeMillis() < deadline) {                  LOG.debug("Fail to get peers, retry again, {}.", error);                  error.append(", ");                  try {                      Thread.sleep(5);                  } catch (final InterruptedException e) {                      ThrowUtil.throwException(e);                  }              } else {                  throw new RouteTableException(error.toString());              }          }      }      final Configuration configs = routeTable.getConfiguration(raftGroupId);      if (configs == null) {          throw new RouteTableException("empty configs in group: " + raftGroupId);      }      final List<PeerId> peerList = configs.getPeers();      if (peerList == null || peerList.isEmpty()) {          throw new RouteTableException("empty peers in group: " + raftGroupId);      }      //如果這個集群里只有一個節點了,那麼直接返回就好了      final int size = peerList.size();      if (size == 1) {          return peerList.get(0).getEndpoint();      }      //獲取負載均衡器,這裡用的是輪詢策略      final RoundRobinLoadBalancer balancer = RoundRobinLoadBalancer.getInstance(regionId);      for (int i = 0; i < size; i++) {          final PeerId candidate = balancer.select(peerList);          final Endpoint luckyOne = candidate.getEndpoint();          if (!luckyOne.equals(unExpect)) {              return luckyOne;          }      }      throw new RouteTableException("have no choice in group(peers): " + raftGroupId);  }

這個方法裏面也有一個是否要強制刷新的判斷,和getLeader方法一樣,不再贅述。然後會判斷一下集群裏面如果不止一個有效節點,那麼會調用輪詢策略來選取節點,這個輪詢的操作十分簡單,就是一個全局的index每次調用加一,然後和傳入的peerList集合的size取模。

到這裡DefaultRheaKVRpcService的callAsyncWithRpc方法就差不多講解完畢了,然後會向server端發起請求,在KVCommandProcessor處理BatchPutRequest請求。

Server端處理BatchPutRequest請求

BatchPutRequest的請求在KVCommandProcessor中被處理。
KVCommandProcessor#handleRequest

public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final T request) {      Requires.requireNonNull(request, "request");      final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure = new RequestProcessClosure<>(request,          bizCtx, asyncCtx);      //根據傳入的RegionId去找到對應的RegionKVService      //每個 RegionKVService 對應一個 Region,只處理本身 Region 範疇內的請求      final RegionKVService regionKVService = this.storeEngine.getRegionKVService(request.getRegionId());      if (regionKVService == null) {          //如果不存在則返回空          final NoRegionFoundResponse noRegion = new NoRegionFoundResponse();          noRegion.setRegionId(request.getRegionId());          noRegion.setError(Errors.NO_REGION_FOUND);          noRegion.setValue(false);          closure.sendResponse(noRegion);          return;      }      switch (request.magic()) {          case BaseRequest.PUT:              regionKVService.handlePutRequest((PutRequest) request, closure);              break;          case BaseRequest.BATCH_PUT:              regionKVService.handleBatchPutRequest((BatchPutRequest) request, closure);              break;          .....          default:              throw new RheaRuntimeException("Unsupported request type: " + request.getClass().getName());      }  }

handleRequest首先會根據RegionId去找RegionKVService,RegionKVService在初始化RegionEngine的時候會註冊到regionKVServiceTable中。
然後根據請求的類型判斷request是什麼請求。這裡我們省略其他請求,只看BATCH_PUT是怎麼做的。

在往下講代碼之前,我先來給個流程調用指指路:

BATCH_PUT對應會調用到DefaultRegionKVService的handleBatchPutRequest方法中 。
DefaultRegionKVService#handleBatchPutRequest

public void handlePutRequest(final PutRequest request,                               final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {      //設置一個響應response      final PutResponse response = new PutResponse();      response.setRegionId(getRegionId());      response.setRegionEpoch(getRegionEpoch());      try {          KVParameterRequires.requireSameEpoch(request, getRegionEpoch());          final byte[] key = KVParameterRequires.requireNonNull(request.getKey(), "put.key");          final byte[] value = KVParameterRequires.requireNonNull(request.getValue(), "put.value");          //這個實例是MetricsRawKVStore          this.rawKVStore.put(key, value, new BaseKVStoreClosure() {                //設置回調函數              @Override              public void run(final Status status) {                  if (status.isOk()) {                      response.setValue((Boolean) getData());                  } else {                      setFailure(request, response, status, getError());                  }                  closure.sendResponse(response);              }          });      } catch (final Throwable t) {          LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));          response.setError(Errors.forException(t));          closure.sendResponse(response);      }  }

handlePutRequest方法十分地簡單,通過獲取key和value之後調用MetricsRawKVStore的put方法,傳入key和value並設置回調函數。

MetricsRawKVStore#put

public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {      final KVStoreClosure c = metricsAdapter(closure, PUT, 1, value.length);      //rawKVStore是RaftRawKVStore的實例      this.rawKVStore.put(key, value, c);  }

put方法會繼續調用RaftRawKVStore的put方法。
RaftRawKVStore#put

public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {      applyOperation(KVOperation.createPut(key, value), closure);  }

Put方法會調用KVOperation的靜態方法創建一個類型為put的KVOperation實例,然後調用applyOperation方法。

RaftRawKVStore#applyOperation

private void applyOperation(final KVOperation op, final KVStoreClosure closure) {      //這裡必須保證 Leader 節點操作申請任務      if (!isLeader()) {          closure.setError(Errors.NOT_LEADER);          closure.run(new Status(RaftError.EPERM, "Not leader"));          return;      }      final Task task = new Task();      //封裝數據      task.setData(ByteBuffer.wrap(Serializers.getDefault().writeObject(op)));      //封裝回調方法      task.setDone(new KVClosureAdapter(closure, op));      //調用NodeImpl的apply方法      this.node.apply(task);  }

applyOperation方法裏面會校驗是不是leader,如果不是leader那麼就不能執行任務申請的操作。然後實例化一個Task實例,設置數據和回調Adapter後調用NodeImple的apply發佈任務。

NodeImpl#apply

public void apply(final Task task) {      //檢查Node是不是被關閉了      if (this.shutdownLatch != null) {          Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));          throw new IllegalStateException("Node is shutting down");      }      //校驗不能為空      Requires.requireNonNull(task, "Null task");        //將task裏面的數據放入到LogEntry中      final LogEntry entry = new LogEntry();      entry.setData(task.getData());      //重試次數      int retryTimes = 0;      try {          //實例化一個Disruptor事件          final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {              event.reset();              event.done = task.getDone();              event.entry = entry;              event.expectedTerm = task.getExpectedTerm();          };          while (true) {              //發佈事件後交給LogEntryAndClosureHandler事件處理器處理              if (this.applyQueue.tryPublishEvent(translator)) {                  break;              } else {                  retryTimes++;                  //最多重試3次                  if (retryTimes > MAX_APPLY_RETRY_TIMES) {                      //不成功則進行回調,通知處理狀態                      Utils.runClosureInThread(task.getDone(),                              new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));                      LOG.warn("Node {} applyQueue is overload.", getNodeId());                      this.metrics.recordTimes("apply-task-overload-times", 1);                      return;                  }                  ThreadHelper.onSpinWait();              }          }        } catch (final Exception e) {          Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));      }  }

在apply方法裏面會將數據封裝到LogEntry實例中,然後將LogEntry打包成一個Disruptor事件發佈到applyQueue隊列裏面去。applyQueue隊列在NodeImpl的init方法裏面初始化,並設置處理器為LogEntryAndClosureHandler。

LogEntryAndClosureHandler#onEvent

private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());    @Override  public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)          throws Exception {      //如果接收到了要關閉的請求      if (event.shutdownLatch != null) {          //tasks隊列裏面的任務又不為空,那麼先處理隊列裏面的數據          if (!this.tasks.isEmpty()) {              //處理tasks              executeApplyingTasks(this.tasks);          }          final int num = GLOBAL_NUM_NODES.decrementAndGet();          LOG.info("The number of active nodes decrement to {}.", num);          event.shutdownLatch.countDown();          return;      }      //將新的event加入到tasks中      this.tasks.add(event);      //因為設置了32為一個批次,所以如果tasks裏面的任務達到了32或者已經是最後一個event,      // 那麼就執行tasks集合裏面的數據      if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {          executeApplyingTasks(this.tasks);          this.tasks.clear();      }  }

onEvent方法會校驗收到的事件是否是請求關閉隊列,如果是的話,那麼會先把tasks集合裏面的數據執行完畢再返回。如果是正常的事件,那麼校驗一下tasks集合裏面的個數是不是已經到達了32個,或者是不是已經是最後一個事件了,那麼會執行executeApplyingTasks進行批量處理數據。

NodeImpl#executeApplyingTasks

private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {      this.writeLock.lock();      try {          final int size = tasks.size();          //如果當前節點不是leader,那麼就不往下進行          if (this.state != State.STATE_LEADER) {              final Status st = new Status();                if (this.state != State.STATE_TRANSFERRING) {                  st.setError(RaftError.EPERM, "Is not leader.");              } else {                  st.setError(RaftError.EBUSY, "Is transferring leadership.");              }              LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);              //處理所有的LogEntryAndClosure,發送回調響應              for (int i = 0; i < size; i++) {                  Utils.runClosureInThread(tasks.get(i).done, st);              }              return;          }          final List<LogEntry> entries = new ArrayList<>(size);          for (int i = 0; i < size; i++) {              final LogEntryAndClosure task = tasks.get(i);              //如果任其不對,那麼直接調用回調函數發送Error              if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {                  LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),                      task.expectedTerm, this.currTerm);                  if (task.done != null) {                      final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",                          task.expectedTerm, this.currTerm);                      Utils.runClosureInThread(task.done, st);                  }                  continue;              }              //保存應用上下文              if (!this.ballotBox.appendPendingTask(this.conf.getConf(),                  this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {                  Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));                  continue;              }              // set task entry info before adding to list.              task.entry.getId().setTerm(this.currTerm);              //設置entry的類型為ENTRY_TYPE_DATA              task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);              entries.add(task.entry);          }          //批量提交申請任務日誌寫入 RocksDB          this.logManager.appendEntries(entries, new LeaderStableClosure(entries));          // update conf.first          this.conf = this.logManager.checkAndSetConfiguration(this.conf);      } finally {          this.writeLock.unlock();      }  }

executeApplyingTasks中會校驗當前的節點是不是leader,因為Raft 副本節點 Node 執行申請任務檢查當前狀態是否為 STATE_LEADER,必須保證 Leader 節點操作申請任務。
循環遍歷節點服務事件判斷任務的預估任期是否等於當前節點任期,Leader 沒有發生變更的階段內提交的日誌擁有相同的 Term 編號,節點 Node 任期滿足預期則 Raft 協議投票箱 BallotBox 調用 appendPendingTask(conf, oldConf, done) 日誌複製之前保存應用上下文,即基於當前節點配置以及原始配置創建選票 Ballot 添加到選票雙向隊列 pendingMetaQueue。
然後日誌管理器 LogManager 調用底層日誌存儲 LogStorage#appendEntries(entries) 批量提交申請任務日誌寫入 RocksDB。

接下來通過 Node#apply(task) 提交的申請任務最終將會複製應用到所有 Raft 節點上的狀態機,RheaKV 狀態機通過繼承 StateMachineAdapter 狀態機適配器的 KVStoreStateMachine 表示。
Raft 狀態機 KVStoreStateMachine 調用 onApply(iterator) 方法按照提交順序應用任務列表到狀態機。
KVStoreStateMachine 狀態機迭代狀態輸出列表積攢鍵值狀態列表批量申請 RocksRawKVStore 調用 batch(kvStates) 方法運行相應鍵值操作存儲到 RocksDB。

總結

這一篇是相當的長流程也是非常的複雜,裏面的各個地方代碼寫的都非常的縝密。我們主要介紹了putBatching皮處理器是怎麼使用Disruptor批量的處理數據,從而做到提升整體的吞吐量。還講解了在發起請求的時候是如何獲取server端的endpoint的。然後還了解了BatchPutRequest請求是怎麼被server處理的,以及在代碼中怎麼體現通過Batch + 全異步機制大幅度提升吞吐的。