4. SOFAJRaft源碼分析— RheaKV初始化做了什麼?

  • 2019 年 10 月 31 日
  • 筆記

前言

由於RheaKV要講起來篇幅比較長,所以這裡分成幾個章節來講,這一章講一講RheaKV初始化做了什麼?

我們先來給個例子,我們從例子來講:

public static void main(final String[] args) throws Exception {      final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()              .withFake(true) // use a fake pd              .config();      final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //              .withStorageType(StorageType.RocksDB)              .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())              .withRaftDataPath(Configs.RAFT_DATA_PATH)              .withServerAddress(new Endpoint("127.0.0.1", 8181))              .config();        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //              .withClusterName(Configs.CLUSTER_NAME) //              .withInitialServerList(Configs.ALL_NODE_ADDRESSES)              .withStoreEngineOptions(storeOpts) //              .withPlacementDriverOptions(pdOpts) //              .config();      System.out.println(opts);      final Node node = new Node(opts);      node.start();      Runtime.getRuntime().addShutdownHook(new Thread(node::stop));      System.out.println("server1 start OK");  }

這裡為了簡化邏輯,使用的無PD設置

Node的實現:

public class Node {        private final RheaKVStoreOptions options;        private RheaKVStore              rheaKVStore;        public Node(RheaKVStoreOptions options) {          this.options = options;      }        public void start() {          this.rheaKVStore = new DefaultRheaKVStore();          this.rheaKVStore.init(this.options);      }        public void stop() {          this.rheaKVStore.shutdown();      }        public RheaKVStore getRheaKVStore() {          return rheaKVStore;      }  }

所以這裡是初始化一個DefaultRheaKVStore,並調用其init方法進行初始化

RheaKV 默認存儲DefaultRheaKVStore

由於DefaultRheaKVStore的初始化方法都是在init方法中完成,所以這裡直接看DefaultRheaKVStore的init方法。

public synchronized boolean init(final RheaKVStoreOptions opts) {      //1. 如果已經啟動了,那麼直接返回      if (this.started) {          LOG.info("[DefaultRheaKVStore] already started.");          return true;      }      this.opts = opts;      // init placement driver      // 2.根據PDoptions設置PD      final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();      final String clusterName = opts.getClusterName();      Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");      Requires.requireNonNull(clusterName, "opts.clusterName");      //設置集群      if (Strings.isBlank(pdOpts.getInitialServerList())) {          // if blank, extends parent's value          pdOpts.setInitialServerList(opts.getInitialServerList());      }      //如果是無 PD 場景, RheaKV 提供 Fake PD Client      if (pdOpts.isFake()) {          this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);      } else {          this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);      }      //初始化PD      if (!this.pdClient.init(pdOpts)) {          LOG.error("Fail to init [PlacementDriverClient].");          return false;      }      // init store engine      //3. 初始化存儲引擎      final StoreEngineOptions stOpts = opts.getStoreEngineOptions();      if (stOpts != null) {          stOpts.setInitialServerList(opts.getInitialServerList());          this.storeEngine = new StoreEngine(this.pdClient);          //初始化存儲引擎          if (!this.storeEngine.init(stOpts)) {              LOG.error("Fail to init [StoreEngine].");              return false;          }      }      //獲取當前節點的ip和埠號      final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();      final RpcOptions rpcOpts = opts.getRpcOptions();      Requires.requireNonNull(rpcOpts, "opts.rpcOptions");      //4. 初始化一個RpcService,並重寫getLeader方法      this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {            @Override          public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {              final Endpoint leader = getLeaderByRegionEngine(regionId);              if (leader != null) {                  return leader;              }              return super.getLeader(regionId, forceRefresh, timeoutMillis);          }      };      if (!this.rheaKVRpcService.init(rpcOpts)) {          LOG.error("Fail to init [RheaKVRpcService].");          return false;      }      //獲取重試次數,默認重試兩次      this.failoverRetries = opts.getFailoverRetries();      //默認5000      this.futureTimeoutMillis = opts.getFutureTimeoutMillis();      //是否只從leader讀取數據,默認為true      this.onlyLeaderRead = opts.isOnlyLeaderRead();      //5.初始化kvDispatcher, 這裡默認為true      if (opts.isUseParallelKVExecutor()) {          //獲取當前cpu          final int numWorkers = Utils.cpus();          //向左移動4位,相當於乘以16          final int bufSize = numWorkers << 4;          final String name = "parallel-kv-executor";          final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED                  //這裡選擇是否啟用執行緒親和性ThreadFactory                  ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);          //初始化Dispatcher          this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT,                  threadFactory);      }      this.batchingOpts = opts.getBatchingOptions();      //默認是true      if (this.batchingOpts.isAllowBatching()) {          //這幾個batching暫時不知道是用來做什麼的,等用到再分析          this.getBatching = new GetBatching(KeyEvent::new, "get_batching",                  new GetBatchingHandler("get", false));          this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",                  new GetBatchingHandler("get_only_safe", true));          this.putBatching = new PutBatching(KVEvent::new, "put_batching",                  new PutBatchingHandler("put"));      }      LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);      return this.started = true;  }
  1. 校驗是否啟動,如果已經啟動了,那麼直接返回
  2. 根據PDoptions設置PD,PD 是全局的中心總控節點,負責整個集群的調度管理,維護 RegionRouteTable 路由表。這裡我們不啟用 PD,所以實例化一個FakePlacementDriverClient,並初始化
  3. 初始化存儲引擎,目前 StoreEngine 存儲引擎支援 MemoryDB 和 RocksDB 兩種實現,我們這裡用的是RocksDB,待會下面補充init方法講解
  4. 初始化一個rheaKVRpcService,針對 KV 存儲服務的 RPC Client 客戶端封裝,實現 Failover 邏輯。並設置重試兩次,等待超時時間futureTimeoutMillis是5000毫秒,默認只從leader讀取數據
  5. 初始化kvDispatcher

初始化存儲引擎

初始化的操作時在StoreEngine的init方法裡面實現的,我們直接看這個方法的實現,這個方法是初始化核心對象,邏輯較為複雜,希望有點耐心看完:

StoreEngine#init

public synchronized boolean init(final StoreEngineOptions opts) {      if (this.started) {          LOG.info("[StoreEngine] already started.");          return true;      }      this.storeOpts = Requires.requireNonNull(opts, "opts");      Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");      //獲取ip和埠      final int port = serverAddress.getPort();      final String ip = serverAddress.getIp();      //如果傳入的IP為空,那麼就設置啟動機器ip作為serverAddress的ip      if (ip == null || Utils.IP_ANY.equals(ip)) {          serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);          opts.setServerAddress(serverAddress);      }      //獲取度量上報時間      final long metricsReportPeriod = opts.getMetricsReportPeriod();      // init region options      List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();      //1. 如果RegionEngineOptions為空,則默認初始化一個      if (rOptsList == null || rOptsList.isEmpty()) {          // -1 region          final RegionEngineOptions rOpts = new RegionEngineOptions();          rOpts.setRegionId(Constants.DEFAULT_REGION_ID);          rOptsList = Lists.newArrayList();          rOptsList.add(rOpts);          opts.setRegionEngineOptionsList(rOptsList);      }      //獲取集群名      final String clusterName = this.pdClient.getClusterName();      //2. 遍歷rOptsList集合,為其中的RegionEngineOptions對象設置參數      for (final RegionEngineOptions rOpts : rOptsList) {          //用集群名+「-」+RegionId 拼接設置為RaftGroupId          rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));          rOpts.setServerAddress(serverAddress);          rOpts.setInitialServerList(opts.getInitialServerList());          if (rOpts.getNodeOptions() == null) {              // copy common node options              rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts                  .getCommonNodeOptions().copy());          }          //如果原本沒有設置度量上報時間,那麼就重置一下          if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {              // extends store opts 300              rOpts.setMetricsReportPeriod(metricsReportPeriod);          }      }      // init store      // 3. 初始化Store和Store裡面的region      final Store store = this.pdClient.getStoreMetadata(opts);      if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {          LOG.error("Empty store metadata: {}.", store);          return false;      }      this.storeId = store.getId();      // init executors      //4. 初始化執行器      if (this.readIndexExecutor == null) {          this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());      }      if (this.raftStateTrigger == null) {          this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());      }      if (this.snapshotExecutor == null) {          this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads());      }      // init rpc executors 默認false      final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();      //5. 初始化rpc遠程執行器,用來執行RPCServer的Processors      if (!useSharedRpcExecutor) {          if (this.cliRpcExecutor == null) {              this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());          }          if (this.raftRpcExecutor == null) {              this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());          }          if (this.kvRpcExecutor == null) {              this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());          }      }      // init metrics      //做指標度量      startMetricReporters(metricsReportPeriod);      // init rpc server      //6. 初始化rpcServer,供其他服務調用      this.rpcServer = new RpcServer(port, true, true);      //為server加入各種processor      RaftRpcServerFactory.addRaftRequestProcessors(this.rpcServer, this.raftRpcExecutor, this.cliRpcExecutor);      StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);      if (!this.rpcServer.start()) {          LOG.error("Fail to init [RpcServer].");          return false;      }      // init db store      //7. 根據不同的類型選擇db      if (!initRawKVStore(opts)) {          return false;      }      // init all region engine      // 8. 為每個region初始化RegionEngine      if (!initAllRegionEngine(opts, store)) {          LOG.error("Fail to init all [RegionEngine].");          return false;      }      // heartbeat sender      //如果開啟了自管理的集群,那麼需要初始化心跳發送器      if (this.pdClient instanceof RemotePlacementDriverClient) {          HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();          if (heartbeatOpts == null) {              heartbeatOpts = new HeartbeatOptions();          }          this.heartbeatSender = new HeartbeatSender(this);          if (!this.heartbeatSender.init(heartbeatOpts)) {              LOG.error("Fail to init [HeartbeatSender].");              return false;          }      }      this.startTime = System.currentTimeMillis();      LOG.info("[StoreEngine] start successfully: {}.", this);      return this.started = true;  }

我們從上面標了號的程式碼往下看:

  1. 這裡是校驗StoreEngineOptions的regionEngineOptionsList是否為空,如果為空則默認初始化一個,然後加入到rOptsList集合里
  2. 遍歷rOptsList集合,並為其中的RegionEngineOptions對象設置集群資訊
  3. 實例化Store然後並根據RegionEngineOptions初始化裡面的region
  4. 初始化執行器
  5. 初始化rpc遠程執行器,用來執行RPCServer的Processors
  6. 初始化rpcServer,供其他服務調用
  7. 根據不同的類型選擇db,目前 StoreEngine 存儲引擎支援 MemoryDB 和 RocksDB 兩種實現。MemoryDB基於 ConcurrentSkipListMap 實現。
  8. 為每個region初始化RegionEngine

初始化Store和Store裡面的region

這裡會調用pdClient的getStoreMetadata方法進行初始化,這裡我們看FakePlacementDriverClient的實現:
FakePlacementDriverClient#getStoreMetadata

public Store getStoreMetadata(final StoreEngineOptions opts) {      //實例化store      final Store store = new Store();      final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();      final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());      store.setId(-1);      store.setEndpoint(opts.getServerAddress());      for (final RegionEngineOptions rOpts : rOptsList) {          //根據rOpts初始化Region實例加入到regionList中          regionList.add(getLocalRegionMetadata(rOpts));      }      store.setRegions(regionList);      return store;  }

這個方法裡面會實例化一個store之後遍歷rOptsList集合,在循環裡面會根據rOptsList裡面的RegionEngineOptions來調用getLocalRegionMetadata方法來實例化region,然後加入到regionList集合中。
在這裡需要主要rOptsList列表和regionList列表的下標是一一對應的關係,在下面的程式碼中會用到這個對應關係。

在這裡應該可以稍微理解到:

這張圖的意義了,每個store下面會有很多的region。

然後我們再看看Region怎麼被初始化的:
這裡是調用FakePlacementDriverClient的父類AbstractPlacementDriverClient的getLocalRegionMetadata來進行初始化的
AbstractPlacementDriverClient#getLocalRegionMetadata

protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {      final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");      Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "                                                                       + Region.MIN_ID_WITH_MANUAL_CONF);      Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "                                                                      + Region.MAX_ID_WITH_MANUAL_CONF);      final byte[] startKey = opts.getStartKeyBytes();      final byte[] endKey = opts.getEndKeyBytes();      final String initialServerList = opts.getInitialServerList();      //實例化region      final Region region = new Region();      final Configuration conf = new Configuration();      // region      region.setId(regionId);      region.setStartKey(startKey);      region.setEndKey(endKey);      region.setRegionEpoch(new RegionEpoch(-1, -1));      // peers      Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");      //將集群ip和埠解析到peer中      conf.parse(initialServerList);      //每個region都會存有集群的資訊      region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));      this.regionRouteTable.addOrUpdateRegion(region);      return region;  }

Region 是最小的 KV 數據單元,可理解為一個數據分區或者分片,每個 Region 都有一個左閉右開的區間 [startKey, endKey),這裡初始化都是null,能夠根據請求流量/負載/數據量大小等指標自動分裂以及自動副本搬遷。Region 有多個副本 Replication 構建 Raft Groups 存儲在不同的 Store 節點,通過 Raft 協議日誌複製功能數據同步到同 Group 的全部節點。

最後會將region存放到regionRouteTable中:

public void addOrUpdateRegion(final Region region) {      Requires.requireNonNull(region, "region");      Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");      final long regionId = region.getId();      final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());      final StampedLock stampedLock = this.stampedLock;      final long stamp = stampedLock.writeLock();      try {          this.regionTable.put(regionId, region.copy());          this.rangeTable.put(startKey, regionId);      } finally {          stampedLock.unlockWrite(stamp);      }  }

在這個方法中將region根據regionId存入到regionTable中,然後根據startKey作為key存入到rangeTable中。

為每個region初始化RegionEngine

在initAllRegionEngine裡面會初始化RegionEngine:
StoreEngine#initAllRegionEngine

private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {      Requires.requireNonNull(opts, "opts");      Requires.requireNonNull(store, "store");      //獲取主目錄      String baseRaftDataPath = opts.getRaftDataPath();      if (Strings.isNotBlank(baseRaftDataPath)) {          try {              FileUtils.forceMkdir(new File(baseRaftDataPath));          } catch (final Throwable t) {              LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);              return false;          }      } else {          baseRaftDataPath = "";      }      final Endpoint serverAddress = opts.getServerAddress();      final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();      final List<Region> regionList = store.getRegions();      //因為regionList是根據rOptsList來初始化的,所以這裡校驗一樣數量是不是一樣的      Requires.requireTrue(rOptsList.size() == regionList.size());      for (int i = 0; i < rOptsList.size(); i++) {          //一一對應的獲取相應的RegionEngineOptions和region          final RegionEngineOptions rOpts = rOptsList.get(i);          final Region region = regionList.get(i);          //如果region路徑是空的,那麼就重新設值          if (Strings.isBlank(rOpts.getRaftDataPath())) {              final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();              rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());          }          Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");          //根據Region初始化RegionEngine          final RegionEngine engine = new RegionEngine(region, this);          if (engine.init(rOpts)) {              //KV Server 服務端的請求處理服務              // 每個 RegionKVService 對應一個 Region,只處理本身 Region 範疇內的請求              final RegionKVService regionKVService = new DefaultRegionKVService(engine);              //放入到regionKVServiceTable中              registerRegionKVService(regionKVService);              //設置region與engine映射表              this.regionEngineTable.put(region.getId(), engine);          } else {              LOG.error("Fail to init [RegionEngine: {}].", region);              return false;          }      }      return true;  }

首先這個方法會初始化一個baseRaftDataPath作為主目錄
然後將rOptsList和regionList都取出來,遍歷rOptsList,並將RegionEngineOptions對應的region也找出來
然後這裡會為每個region實例化一個RegionEngine,並將engine包裝到RegionKVService中
最後將RegionKVService放入到regionKVServiceTable映射表中,將region放入到regionEngineTable映射表中

這裡的RegionKVServic是KV Server 服務端的請求處理服務,一個 StoreEngine 中包含很多 RegionKVService, 每個 RegionKVService 對應一個 Region,只處理本身 Region 範疇內的請求。

初始化RegionEngine#init

public synchronized boolean init(final RegionEngineOptions opts) {      if (this.started) {          LOG.info("[RegionEngine: {}] already started.", this.region);          return true;      }      this.regionOpts = Requires.requireNonNull(opts, "opts");      //實例化狀態機      this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);        // node options      NodeOptions nodeOpts = opts.getNodeOptions();      if (nodeOpts == null) {          nodeOpts = new NodeOptions();      }      //如果度量間隔時間大於零,那麼開啟度量      final long metricsReportPeriod = opts.getMetricsReportPeriod();      if (metricsReportPeriod > 0) {          // metricsReportPeriod > 0 means enable metrics          nodeOpts.setEnableMetrics(true);      }      //初始化集群配置      nodeOpts.setInitialConf(new Configuration(JRaftHelper.toJRaftPeerIdList(this.region.getPeers())));      nodeOpts.setFsm(this.fsm);      //初始化各種日誌的路徑      final String raftDataPath = opts.getRaftDataPath();      try {          FileUtils.forceMkdir(new File(raftDataPath));      } catch (final Throwable t) {          LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);          return false;      }      if (Strings.isBlank(nodeOpts.getLogUri())) {          final Path logUri = Paths.get(raftDataPath, "log");          nodeOpts.setLogUri(logUri.toString());      }      if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {          final Path meteUri = Paths.get(raftDataPath, "meta");          nodeOpts.setRaftMetaUri(meteUri.toString());      }      if (Strings.isBlank(nodeOpts.getSnapshotUri())) {          final Path snapshotUri = Paths.get(raftDataPath, "snapshot");          nodeOpts.setSnapshotUri(snapshotUri.toString());      }      LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,          nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());      final Endpoint serverAddress = opts.getServerAddress();      final PeerId serverId = new PeerId(serverAddress, 0);      final RpcServer rpcServer = this.storeEngine.getRpcServer();      this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);      //初始化node節點      this.node = this.raftGroupService.start(false);      RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());      if (this.node != null) {          final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();          final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();          //RaftRawKVStore 是 RheaKV 基於 Raft 複製狀態機 KVStoreStateMachine 的 RawKVStore 介面 KV 存儲實現          //RheaKV 的 Raft 入口,從這裡開始 Raft 流程          this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);          //攔截請求做指標度量          this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);          // metrics config          if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {              final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();              if (metricRegistry != null) {                  final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();                  // start raft node metrics reporter                  this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //                      .prefixedWith("region_" + this.region.getId()) //                      .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //                      .outputTo(LOG) //                      .scheduleOn(scheduler) //                      .shutdownExecutorOnStop(scheduler != null) //                      .build();                  this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);              }          }          this.started = true;          LOG.info("[RegionEngine] start successfully: {}.", this);      }      return this.started;  }

走到了這裡可以找得到我在第一講的時候講過熟悉的幾個實例了。如果不熟悉的話,不妨去翻閱一下我的第一篇文章:1. SOFAJRaft源碼分析— SOFAJRaft啟動時做了什麼?
在這裡會實例化狀態機,是KVStoreStateMachine的實例;
收動為LogUri、RaftMetaUri、SnapshotUri賦值,並獲取storeEngine里的rpcServer;
啟動raftGroupService返回經過初始化的node;
接下來會實例化raftRawKVStore,這個實例是RheaKV 的 Raft 入口,從這裡開始 Raft 流程,所有的RheaKV數據都是通過它來處理。

總結

RheaKV初始化也是講了很多的內容,這一篇講了RheaKV在啟動時需要初始化哪些組件,Store和Region又是一個怎樣的關係,已經JRaft是在哪裡啟動的,狀態機是在哪裡設置的等等,內容也是非常的豐富。從這裡也可以感受到,看到一個好的架構設計就是一種享受。