4. SOFAJRaft源码分析— RheaKV初始化做了什么?
- 2019 年 10 月 31 日
- 筆記
前言
由于RheaKV要讲起来篇幅比较长,所以这里分成几个章节来讲,这一章讲一讲RheaKV初始化做了什么?
我们先来给个例子,我们从例子来讲:
public static void main(final String[] args) throws Exception { final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() .withFake(true) // use a fake pd .config(); final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() // .withStorageType(StorageType.RocksDB) .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config()) .withRaftDataPath(Configs.RAFT_DATA_PATH) .withServerAddress(new Endpoint("127.0.0.1", 8181)) .config(); final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() // .withClusterName(Configs.CLUSTER_NAME) // .withInitialServerList(Configs.ALL_NODE_ADDRESSES) .withStoreEngineOptions(storeOpts) // .withPlacementDriverOptions(pdOpts) // .config(); System.out.println(opts); final Node node = new Node(opts); node.start(); Runtime.getRuntime().addShutdownHook(new Thread(node::stop)); System.out.println("server1 start OK"); }
这里为了简化逻辑,使用的无PD设置
Node的实现:
public class Node { private final RheaKVStoreOptions options; private RheaKVStore rheaKVStore; public Node(RheaKVStoreOptions options) { this.options = options; } public void start() { this.rheaKVStore = new DefaultRheaKVStore(); this.rheaKVStore.init(this.options); } public void stop() { this.rheaKVStore.shutdown(); } public RheaKVStore getRheaKVStore() { return rheaKVStore; } }
所以这里是初始化一个DefaultRheaKVStore,并调用其init方法进行初始化
RheaKV 默认存储DefaultRheaKVStore
由于DefaultRheaKVStore的初始化方法都是在init方法中完成,所以这里直接看DefaultRheaKVStore的init方法。
public synchronized boolean init(final RheaKVStoreOptions opts) { //1. 如果已经启动了,那么直接返回 if (this.started) { LOG.info("[DefaultRheaKVStore] already started."); return true; } this.opts = opts; // init placement driver // 2.根据PDoptions设置PD final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions(); final String clusterName = opts.getClusterName(); Requires.requireNonNull(pdOpts, "opts.placementDriverOptions"); Requires.requireNonNull(clusterName, "opts.clusterName"); //设置集群 if (Strings.isBlank(pdOpts.getInitialServerList())) { // if blank, extends parent's value pdOpts.setInitialServerList(opts.getInitialServerList()); } //如果是无 PD 场景, RheaKV 提供 Fake PD Client if (pdOpts.isFake()) { this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName); } else { this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName); } //初始化PD if (!this.pdClient.init(pdOpts)) { LOG.error("Fail to init [PlacementDriverClient]."); return false; } // init store engine //3. 初始化存储引擎 final StoreEngineOptions stOpts = opts.getStoreEngineOptions(); if (stOpts != null) { stOpts.setInitialServerList(opts.getInitialServerList()); this.storeEngine = new StoreEngine(this.pdClient); //初始化存储引擎 if (!this.storeEngine.init(stOpts)) { LOG.error("Fail to init [StoreEngine]."); return false; } } //获取当前节点的ip和端口号 final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint(); final RpcOptions rpcOpts = opts.getRpcOptions(); Requires.requireNonNull(rpcOpts, "opts.rpcOptions"); //4. 初始化一个RpcService,并重写getLeader方法 this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) { @Override public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) { final Endpoint leader = getLeaderByRegionEngine(regionId); if (leader != null) { return leader; } return super.getLeader(regionId, forceRefresh, timeoutMillis); } }; if (!this.rheaKVRpcService.init(rpcOpts)) { LOG.error("Fail to init [RheaKVRpcService]."); return false; } //获取重试次数,默认重试两次 this.failoverRetries = opts.getFailoverRetries(); //默认5000 this.futureTimeoutMillis = opts.getFutureTimeoutMillis(); //是否只从leader读取数据,默认为true this.onlyLeaderRead = opts.isOnlyLeaderRead(); //5.初始化kvDispatcher, 这里默认为true if (opts.isUseParallelKVExecutor()) { //获取当前cpu final int numWorkers = Utils.cpus(); //向左移动4位,相当于乘以16 final int bufSize = numWorkers << 4; final String name = "parallel-kv-executor"; final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED //这里选择是否启用线程亲和性ThreadFactory ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true); //初始化Dispatcher this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory); } this.batchingOpts = opts.getBatchingOptions(); //默认是true if (this.batchingOpts.isAllowBatching()) { //这几个batching暂时不知道是用来做什么的,等用到再分析 this.getBatching = new GetBatching(KeyEvent::new, "get_batching", new GetBatchingHandler("get", false)); this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe", new GetBatchingHandler("get_only_safe", true)); this.putBatching = new PutBatching(KVEvent::new, "put_batching", new PutBatchingHandler("put")); } LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts); return this.started = true; }
- 校验是否启动,如果已经启动了,那么直接返回
- 根据PDoptions设置PD,PD 是全局的中心总控节点,负责整个集群的调度管理,维护 RegionRouteTable 路由表。这里我们不启用 PD,所以实例化一个FakePlacementDriverClient,并初始化
- 初始化存储引擎,目前 StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现,我们这里用的是RocksDB,待会下面补充init方法讲解
- 初始化一个rheaKVRpcService,针对 KV 存储服务的 RPC Client 客户端封装,实现 Failover 逻辑。并设置重试两次,等待超时时间futureTimeoutMillis是5000毫秒,默认只从leader读取数据
- 初始化kvDispatcher
初始化存储引擎
初始化的操作时在StoreEngine的init方法里面实现的,我们直接看这个方法的实现,这个方法是初始化核心对象,逻辑较为复杂,希望有点耐心看完:
StoreEngine#init
public synchronized boolean init(final StoreEngineOptions opts) { if (this.started) { LOG.info("[StoreEngine] already started."); return true; } this.storeOpts = Requires.requireNonNull(opts, "opts"); Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress"); //获取ip和端口 final int port = serverAddress.getPort(); final String ip = serverAddress.getIp(); //如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip if (ip == null || Utils.IP_ANY.equals(ip)) { serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port); opts.setServerAddress(serverAddress); } //获取度量上报时间 final long metricsReportPeriod = opts.getMetricsReportPeriod(); // init region options List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList(); //1. 如果RegionEngineOptions为空,则默认初始化一个 if (rOptsList == null || rOptsList.isEmpty()) { // -1 region final RegionEngineOptions rOpts = new RegionEngineOptions(); rOpts.setRegionId(Constants.DEFAULT_REGION_ID); rOptsList = Lists.newArrayList(); rOptsList.add(rOpts); opts.setRegionEngineOptionsList(rOptsList); } //获取集群名 final String clusterName = this.pdClient.getClusterName(); //2. 遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数 for (final RegionEngineOptions rOpts : rOptsList) { //用集群名+“-”+RegionId 拼接设置为RaftGroupId rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId())); rOpts.setServerAddress(serverAddress); rOpts.setInitialServerList(opts.getInitialServerList()); if (rOpts.getNodeOptions() == null) { // copy common node options rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts .getCommonNodeOptions().copy()); } //如果原本没有设置度量上报时间,那么就重置一下 if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) { // extends store opts 300 rOpts.setMetricsReportPeriod(metricsReportPeriod); } } // init store // 3. 初始化Store和Store里面的region final Store store = this.pdClient.getStoreMetadata(opts); if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) { LOG.error("Empty store metadata: {}.", store); return false; } this.storeId = store.getId(); // init executors //4. 初始化执行器 if (this.readIndexExecutor == null) { this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); } if (this.raftStateTrigger == null) { this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads()); } if (this.snapshotExecutor == null) { this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads()); } // init rpc executors 默认false final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor(); //5. 初始化rpc远程执行器,用来执行RPCServer的Processors if (!useSharedRpcExecutor) { if (this.cliRpcExecutor == null) { this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads()); } if (this.raftRpcExecutor == null) { this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads()); } if (this.kvRpcExecutor == null) { this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads()); } } // init metrics //做指标度量 startMetricReporters(metricsReportPeriod); // init rpc server //6. 初始化rpcServer,供其他服务调用 this.rpcServer = new RpcServer(port, true, true); //为server加入各种processor RaftRpcServerFactory.addRaftRequestProcessors(this.rpcServer, this.raftRpcExecutor, this.cliRpcExecutor); StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this); if (!this.rpcServer.start()) { LOG.error("Fail to init [RpcServer]."); return false; } // init db store //7. 根据不同的类型选择db if (!initRawKVStore(opts)) { return false; } // init all region engine // 8. 为每个region初始化RegionEngine if (!initAllRegionEngine(opts, store)) { LOG.error("Fail to init all [RegionEngine]."); return false; } // heartbeat sender //如果开启了自管理的集群,那么需要初始化心跳发送器 if (this.pdClient instanceof RemotePlacementDriverClient) { HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions(); if (heartbeatOpts == null) { heartbeatOpts = new HeartbeatOptions(); } this.heartbeatSender = new HeartbeatSender(this); if (!this.heartbeatSender.init(heartbeatOpts)) { LOG.error("Fail to init [HeartbeatSender]."); return false; } } this.startTime = System.currentTimeMillis(); LOG.info("[StoreEngine] start successfully: {}.", this); return this.started = true; }
我们从上面标了号的代码往下看:
- 这里是校验StoreEngineOptions的regionEngineOptionsList是否为空,如果为空则默认初始化一个,然后加入到rOptsList集合里
- 遍历rOptsList集合,并为其中的RegionEngineOptions对象设置集群信息
- 实例化Store然后并根据RegionEngineOptions初始化里面的region
- 初始化执行器
- 初始化rpc远程执行器,用来执行RPCServer的Processors
- 初始化rpcServer,供其他服务调用
- 根据不同的类型选择db,目前 StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现。MemoryDB基于 ConcurrentSkipListMap 实现。
- 为每个region初始化RegionEngine
初始化Store和Store里面的region
这里会调用pdClient的getStoreMetadata方法进行初始化,这里我们看FakePlacementDriverClient的实现:
FakePlacementDriverClient#getStoreMetadata
public Store getStoreMetadata(final StoreEngineOptions opts) { //实例化store final Store store = new Store(); final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList(); final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size()); store.setId(-1); store.setEndpoint(opts.getServerAddress()); for (final RegionEngineOptions rOpts : rOptsList) { //根据rOpts初始化Region实例加入到regionList中 regionList.add(getLocalRegionMetadata(rOpts)); } store.setRegions(regionList); return store; }
这个方法里面会实例化一个store之后遍历rOptsList集合,在循环里面会根据rOptsList里面的RegionEngineOptions来调用getLocalRegionMetadata方法来实例化region,然后加入到regionList集合中。
在这里需要主要rOptsList列表和regionList列表的下标是一一对应的关系,在下面的代码中会用到这个对应关系。
在这里应该可以稍微理解到:
这张图的意义了,每个store下面会有很多的region。
然后我们再看看Region怎么被初始化的:
这里是调用FakePlacementDriverClient的父类AbstractPlacementDriverClient的getLocalRegionMetadata来进行初始化的
AbstractPlacementDriverClient#getLocalRegionMetadata
protected Region getLocalRegionMetadata(final RegionEngineOptions opts) { final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId"); Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= " + Region.MIN_ID_WITH_MANUAL_CONF); Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < " + Region.MAX_ID_WITH_MANUAL_CONF); final byte[] startKey = opts.getStartKeyBytes(); final byte[] endKey = opts.getEndKeyBytes(); final String initialServerList = opts.getInitialServerList(); //实例化region final Region region = new Region(); final Configuration conf = new Configuration(); // region region.setId(regionId); region.setStartKey(startKey); region.setEndKey(endKey); region.setRegionEpoch(new RegionEpoch(-1, -1)); // peers Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank"); //将集群ip和端口解析到peer中 conf.parse(initialServerList); //每个region都会存有集群的信息 region.setPeers(JRaftHelper.toPeerList(conf.listPeers())); this.regionRouteTable.addOrUpdateRegion(region); return region; }
Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),这里初始化都是null,能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。
最后会将region存放到regionRouteTable中:
public void addOrUpdateRegion(final Region region) { Requires.requireNonNull(region, "region"); Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch"); final long regionId = region.getId(); final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey()); final StampedLock stampedLock = this.stampedLock; final long stamp = stampedLock.writeLock(); try { this.regionTable.put(regionId, region.copy()); this.rangeTable.put(startKey, regionId); } finally { stampedLock.unlockWrite(stamp); } }
在这个方法中将region根据regionId存入到regionTable中,然后根据startKey作为key存入到rangeTable中。
为每个region初始化RegionEngine
在initAllRegionEngine里面会初始化RegionEngine:
StoreEngine#initAllRegionEngine
private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) { Requires.requireNonNull(opts, "opts"); Requires.requireNonNull(store, "store"); //获取主目录 String baseRaftDataPath = opts.getRaftDataPath(); if (Strings.isNotBlank(baseRaftDataPath)) { try { FileUtils.forceMkdir(new File(baseRaftDataPath)); } catch (final Throwable t) { LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath); return false; } } else { baseRaftDataPath = ""; } final Endpoint serverAddress = opts.getServerAddress(); final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList(); final List<Region> regionList = store.getRegions(); //因为regionList是根据rOptsList来初始化的,所以这里校验一样数量是不是一样的 Requires.requireTrue(rOptsList.size() == regionList.size()); for (int i = 0; i < rOptsList.size(); i++) { //一一对应的获取相应的RegionEngineOptions和region final RegionEngineOptions rOpts = rOptsList.get(i); final Region region = regionList.get(i); //如果region路径是空的,那么就重新设值 if (Strings.isBlank(rOpts.getRaftDataPath())) { final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort(); rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString()); } Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch"); //根据Region初始化RegionEngine final RegionEngine engine = new RegionEngine(region, this); if (engine.init(rOpts)) { //KV Server 服务端的请求处理服务 // 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求 final RegionKVService regionKVService = new DefaultRegionKVService(engine); //放入到regionKVServiceTable中 registerRegionKVService(regionKVService); //设置region与engine映射表 this.regionEngineTable.put(region.getId(), engine); } else { LOG.error("Fail to init [RegionEngine: {}].", region); return false; } } return true; }
首先这个方法会初始化一个baseRaftDataPath作为主目录
然后将rOptsList和regionList都取出来,遍历rOptsList,并将RegionEngineOptions对应的region也找出来
然后这里会为每个region实例化一个RegionEngine,并将engine包装到RegionKVService中
最后将RegionKVService放入到regionKVServiceTable映射表中,将region放入到regionEngineTable映射表中
这里的RegionKVServic是KV Server 服务端的请求处理服务,一个 StoreEngine 中包含很多 RegionKVService, 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求。
初始化RegionEngine#init
public synchronized boolean init(final RegionEngineOptions opts) { if (this.started) { LOG.info("[RegionEngine: {}] already started.", this.region); return true; } this.regionOpts = Requires.requireNonNull(opts, "opts"); //实例化状态机 this.fsm = new KVStoreStateMachine(this.region, this.storeEngine); // node options NodeOptions nodeOpts = opts.getNodeOptions(); if (nodeOpts == null) { nodeOpts = new NodeOptions(); } //如果度量间隔时间大于零,那么开启度量 final long metricsReportPeriod = opts.getMetricsReportPeriod(); if (metricsReportPeriod > 0) { // metricsReportPeriod > 0 means enable metrics nodeOpts.setEnableMetrics(true); } //初始化集群配置 nodeOpts.setInitialConf(new Configuration(JRaftHelper.toJRaftPeerIdList(this.region.getPeers()))); nodeOpts.setFsm(this.fsm); //初始化各种日志的路径 final String raftDataPath = opts.getRaftDataPath(); try { FileUtils.forceMkdir(new File(raftDataPath)); } catch (final Throwable t) { LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath); return false; } if (Strings.isBlank(nodeOpts.getLogUri())) { final Path logUri = Paths.get(raftDataPath, "log"); nodeOpts.setLogUri(logUri.toString()); } if (Strings.isBlank(nodeOpts.getRaftMetaUri())) { final Path meteUri = Paths.get(raftDataPath, "meta"); nodeOpts.setRaftMetaUri(meteUri.toString()); } if (Strings.isBlank(nodeOpts.getSnapshotUri())) { final Path snapshotUri = Paths.get(raftDataPath, "snapshot"); nodeOpts.setSnapshotUri(snapshotUri.toString()); } LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region, nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri()); final Endpoint serverAddress = opts.getServerAddress(); final PeerId serverId = new PeerId(serverAddress, 0); final RpcServer rpcServer = this.storeEngine.getRpcServer(); this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true); //初始化node节点 this.node = this.raftGroupService.start(false); RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf()); if (this.node != null) { final RawKVStore rawKVStore = this.storeEngine.getRawKVStore(); final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor(); //RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现 //RheaKV 的 Raft 入口,从这里开始 Raft 流程 this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor); //拦截请求做指标度量 this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore); // metrics config if (this.regionMetricsReporter == null && metricsReportPeriod > 0) { final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry(); if (metricRegistry != null) { final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler(); // start raft node metrics reporter this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) // .prefixedWith("region_" + this.region.getId()) // .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) // .outputTo(LOG) // .scheduleOn(scheduler) // .shutdownExecutorOnStop(scheduler != null) // .build(); this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS); } } this.started = true; LOG.info("[RegionEngine] start successfully: {}.", this); } return this.started; }
走到了这里可以找得到我在第一讲的时候讲过熟悉的几个实例了。如果不熟悉的话,不妨去翻阅一下我的第一篇文章:1. SOFAJRaft源码分析— SOFAJRaft启动时做了什么?
在这里会实例化状态机,是KVStoreStateMachine的实例;
收动为LogUri、RaftMetaUri、SnapshotUri赋值,并获取storeEngine里的rpcServer;
启动raftGroupService返回经过初始化的node;
接下来会实例化raftRawKVStore,这个实例是RheaKV 的 Raft 入口,从这里开始 Raft 流程,所有的RheaKV数据都是通过它来处理。
总结
RheaKV初始化也是讲了很多的内容,这一篇讲了RheaKV在启动时需要初始化哪些组件,Store和Region又是一个怎样的关系,已经JRaft是在哪里启动的,状态机是在哪里设置的等等,内容也是非常的丰富。从这里也可以感受到,看到一个好的架构设计就是一种享受。