1. SOFAJRaft源碼分析— SOFAJRaft啟動時做了什麼?

  • 2019 年 10 月 10 日
  • 筆記

我們這次依然用上次的例子CounterServer來進行講解:

我這裡就不貼整個程式碼了

public static void main(final String[] args) throws IOException {      if (args.length != 4) {          System.out              .println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");          System.out              .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer " +                      "/tmp/server1 " +                      "counter " +                      "127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");          System.exit(1);      }      //日誌存儲的路徑      final String dataPath = args[0];      //SOFAJRaft集群的名字      final String groupId = args[1];      //當前節點的ip和埠      final String serverIdStr = args[2];      //集群節點的ip和埠      final String initConfStr = args[3];        final NodeOptions nodeOptions = new NodeOptions();      // 為了測試,調整 snapshot 間隔等參數      // 設置選舉超時時間為 1 秒      nodeOptions.setElectionTimeoutMs(1000);      // 關閉 CLI 服務。      nodeOptions.setDisableCli(false);      // 每隔30秒做一次 snapshot      nodeOptions.setSnapshotIntervalSecs(30);      // 解析參數      final PeerId serverId = new PeerId();      if (!serverId.parse(serverIdStr)) {          throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);      }      final Configuration initConf = new Configuration();      //將raft分組加入到Configuration的peers數組中      if (!initConf.parse(initConfStr)) {          throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);      }      // 設置初始集群配置      nodeOptions.setInitialConf(initConf);        // 啟動      final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);      System.out.println("Started counter server at port:"                         + counterServer.getNode().getNodeId().getPeerId().getPort());  }

我們在啟動server的main方法的時候會傳入日誌存儲的路徑、SOFAJRaft集群的名字、當前節點的ip和埠、集群節點的ip和埠並設值到NodeOptions中,作為當前節點啟動的參數。

這裡會將當前節點初始化為一個PeerId對象
PeerId

//存放當前節點的ip和埠號  private Endpoint            endpoint         = new Endpoint(Utils.IP_ANY, 0);    //默認是0  private int                 idx;  //是一個ip:埠的字元串  private String              str;  public PeerId() {      super();  }    public boolean parse(final String s) {      final String[] tmps = StringUtils.split(s, ':');      if (tmps.length != 3 && tmps.length != 2) {          return false;      }      try {          final int port = Integer.parseInt(tmps[1]);          this.endpoint = new Endpoint(tmps[0], port);          if (tmps.length == 3) {              this.idx = Integer.parseInt(tmps[2]);          } else {              this.idx = 0;          }          this.str = null;          return true;      } catch (final Exception e) {          LOG.error("Parse peer from string failed: {}", s, e);          return false;      }  }

PeerId的parse方法會將傳入的ip:埠解析之後對變數進行一些賦值的操作。

然後會調用到CounterServer的構造器中:
CounterServer

public CounterServer(final String dataPath, final String groupId, final PeerId serverId,                       final NodeOptions nodeOptions) throws IOException {      // 初始化路徑      FileUtils.forceMkdir(new File(dataPath));        // 這裡讓 raft RPC 和業務 RPC 使用同一個 RPC server, 通常也可以分開      final RpcServer rpcServer = new RpcServer(serverId.getPort());      RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);      // 註冊業務處理器      rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));      rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));      // 初始化狀態機      this.fsm = new CounterStateMachine();      // 設置狀態機到啟動參數      nodeOptions.setFsm(this.fsm);      // 設置存儲路徑      // 日誌, 必須      nodeOptions.setLogUri(dataPath + File.separator + "log");      // 元資訊, 必須      nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");      // snapshot, 可選, 一般都推薦      nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");      // 初始化 raft group 服務框架      this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);      // 啟動      this.node = this.raftGroupService.start();  }

這個方法主要是調用NodeOptions的各種方法進行設置,然後調用raftGroupService的start方法啟動raft節點。

RaftGroupService

我們來到RaftGroupService的start方法:
RaftGroupService#start

public synchronized Node start(final boolean startRpcServer) {      //如果已經啟動了,那麼就返回      if (this.started) {          return this.node;      }      //校驗serverId和groupId      if (this.serverId == null || this.serverId.getEndpoint() == null              || this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {          throw new IllegalArgumentException("Blank serverId:" + this.serverId);      }      if (StringUtils.isBlank(this.groupId)) {          throw new IllegalArgumentException("Blank group id" + this.groupId);      }      //Adds RPC server to Server.      //設置當前node的ip和埠      NodeManager.getInstance().addAddress(this.serverId.getEndpoint());        //創建node      this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);      if (startRpcServer) {          //啟動遠程服務          this.rpcServer.start();      } else {          LOG.warn("RPC server is not started in RaftGroupService.");      }      this.started = true;      LOG.info("Start the RaftGroupService successfully.");      return this.node;  }

這個方法會在一開始的時候對RaftGroupService在構造器實例化的參數進行校驗,然後把當前節點的Endpoint添加到NodeManager的addrSet變數中,接著調用RaftServiceFactory#createAndInitRaftNode實例化Node節點。

每個節點都會啟動一個rpc的服務,因為每個節點既可以被選舉也可以投票給其他節點,節點之間需要互相通訊,所以需要啟動一個rpc服務。

RaftServiceFactory#createAndInitRaftNode

public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {      //實例化一個node節點      final Node ret = createRaftNode(groupId, serverId);      //為node節點初始化      if (!ret.init(opts)) {          throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");      }      return ret;  }    public static Node createRaftNode(final String groupId, final PeerId serverId) {      return new NodeImpl(groupId, serverId);  }

createAndInitRaftNode方法首先調用createRaftNode實例化一個Node的實例NodeImpl,然後調用其init方法進行初始化,主要的配置都是在init方法中完成的。

NodeImpl

public NodeImpl(final String groupId, final PeerId serverId) {      super();      if (groupId != null) {          //檢驗groupId是否符合格式規範          Utils.verifyGroupId(groupId);      }      this.groupId = groupId;      this.serverId = serverId != null ? serverId.copy() : null;      //一開始的設置為未初始化      this.state = State.STATE_UNINITIALIZED;      //設置新的任期為0      this.currTerm = 0;      //設置最新的時間戳      updateLastLeaderTimestamp(Utils.monotonicMs());      this.confCtx = new ConfigurationCtx(this);      this.wakingCandidate = null;      final int num = GLOBAL_NUM_NODES.incrementAndGet();      LOG.info("The number of active nodes increment to {}.", num);  }

NodeImpl會在構造器中初始化一些參數。

Node的初始化

Node節點的所有的重要的配置都是在init方法中完成的,NodeImpl的init方法比較長所以分成程式碼塊來進行講解。

NodeImpl#init

//非空校驗  Requires.requireNonNull(opts, "Null node options");  Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");  Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");  //目前就一個實現:DefaultJRaftServiceFactory  this.serviceFactory = opts.getServiceFactory();  this.options = opts;  this.raftOptions = opts.getRaftOptions();  //基於 Metrics 類庫的性能指標統計,具有豐富的性能統計指標,默認不開啟度量工具  this.metrics = new NodeMetrics(opts.isEnableMetrics());    if (this.serverId.getIp().equals(Utils.IP_ANY)) {      LOG.error("Node can't started from IP_ANY.");      return false;  }    if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) {      LOG.error("No RPC server attached to, did you forget to call addService?");      return false;  }  //定時任務管理器  this.timerManager = new TimerManager();  //初始化定時任務管理器的內置執行緒池  if (!this.timerManager.init(this.options.getTimerPoolSize())) {      LOG.error("Fail to init timer manager.");      return false;  }    //定時任務管理器  this.timerManager = new TimerManager();  //初始化定時任務管理器的內置執行緒池  if (!this.timerManager.init(this.options.getTimerPoolSize())) {      LOG.error("Fail to init timer manager.");      return false;  }

這段程式碼主要是給各個變數賦值,然後進行校驗判斷一下serverId不能為0.0.0.0,當前的Endpoint必須要在NodeManager裡面設置過等等(NodeManager的設置是在RaftGroupService的start方法里)。

然後會初始化一個全局的的定時調度管理器TimerManager:
TimerManager

private ScheduledExecutorService executor;    @Override  public boolean init(Integer coreSize) {      this.executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory(          "JRaft-Node-ScheduleThreadPool-", true));      return true;  }

TimerManager的init方法就是初始化一個執行緒池,如果當前的伺服器的cpu執行緒數3 大於20 ,那麼這個執行緒池的coreSize就是20,否則就是cpu執行緒數3。

往下走是計時器的初始化:

// Init timers  //設置投票計時器  this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {        @Override      protected void onTrigger() {          //處理投票超時          handleVoteTimeout();      }        @Override      protected int adjustTimeout(final int timeoutMs) {          //在一定範圍內返回一個隨機的時間戳          return randomTimeout(timeoutMs);      }  };  //設置預投票計時器  //當leader在規定的一段時間內沒有與 Follower 艦船進行通訊時,  // Follower 就可以認為leader已經不能正常擔任旗艦的職責,則 Follower 可以去嘗試接替leader的角色。  // 這段通訊超時被稱為 Election Timeout  //候選者在發起投票之前,先發起預投票  this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {        @Override      protected void onTrigger() {          handleElectionTimeout();      }        @Override      protected int adjustTimeout(final int timeoutMs) {          //在一定範圍內返回一個隨機的時間戳          //為了避免同時發起選舉而導致失敗          return randomTimeout(timeoutMs);      }  };  //leader下台的計時器  //定時檢查是否需要重新選舉leader  this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {        @Override      protected void onTrigger() {          handleStepDownTimeout();      }  };  //快照計時器  this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) {        @Override      protected void onTrigger() {          handleSnapshotTimeout();      }  };

voteTimer是用來控制選舉的,如果選舉超時,當前的節點又是候選者角色,那麼就會發起選舉。
electionTimer是預投票計時器。候選者在發起投票之前,先發起預投票,如果沒有得到半數以上節點的回饋,則候選者就會識趣的放棄參選。
stepDownTimer定時檢查是否需要重新選舉leader。當前的leader可能出現它的Follower可能並沒有整個集群的1/2卻還沒有下台的情況,那麼這個時候會定期的檢查看leader的Follower是否有那麼多,沒有那麼多的話會強制讓leader下台。
snapshotTimer快照計時器。這個計時器會每隔1小時觸發一次生成一個快照。

這些計時器的具體實現現在暫時不表,等到要講具體功能的時候再進行梳理。

這些計時器有一個共同的特點就是會根據不同的計時器返回一個在一定範圍內隨機的時間。返回一個隨機的時間可以防止多個節點在同一時間內同時發起投票選舉從而降低選舉失敗的概率。

繼續往下看:

this.configManager = new ConfigurationManager();  //初始化一個disruptor,採用多生產者模式  this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //          //設置disruptor大小,默認16384          .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //          .setEventFactory(new LogEntryAndClosureFactory()) //          .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //          .setProducerType(ProducerType.MULTI) //          .setWaitStrategy(new BlockingWaitStrategy()) //          .build();  //設置事件處理器  this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());  //設置異常處理器  this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));  // 啟動disruptor的執行緒  this.applyQueue = this.applyDisruptor.start();  //如果開啟了metrics統計  if (this.metrics.getMetricRegistry() != null) {      this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",              new DisruptorMetricSet(this.applyQueue));  }

這裡初始化了一個Disruptor作為消費隊列,不清楚Disruptor的朋友可以去看我上一篇文章:Disruptor—核心概念及體驗。然後還校驗了metrics是否開啟,默認是不開啟的。

繼續往下看:

//fsmCaller封裝對業務 StateMachine 的狀態轉換的調用以及日誌的寫入等  this.fsmCaller = new FSMCallerImpl();  //初始化日誌存儲功能  if (!initLogStorage()) {      LOG.error("Node {} initLogStorage failed.", getNodeId());      return false;  }  //初始化元數據存儲功能  if (!initMetaStorage()) {      LOG.error("Node {} initMetaStorage failed.", getNodeId());      return false;  }  //對FSMCaller初始化  if (!initFSMCaller(new LogId(0, 0))) {      LOG.error("Node {} initFSMCaller failed.", getNodeId());      return false;  }  //實例化投票箱  this.ballotBox = new BallotBox();  final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();  ballotBoxOpts.setWaiter(this.fsmCaller);  ballotBoxOpts.setClosureQueue(this.closureQueue);  //初始化ballotBox的屬性  if (!this.ballotBox.init(ballotBoxOpts)) {      LOG.error("Node {} init ballotBox failed.", getNodeId());      return false;  }  //初始化快照存儲功能  if (!initSnapshotStorage()) {      LOG.error("Node {} initSnapshotStorage failed.", getNodeId());      return false;  }  //校驗日誌文件索引的一致性  final Status st = this.logManager.checkConsistency();  if (!st.isOk()) {      LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);      return false;  }  //配置管理raft group中的資訊  this.conf = new ConfigurationEntry();  this.conf.setId(new LogId());  // if have log using conf in log, else using conf in options  if (this.logManager.getLastLogIndex() > 0) {      this.conf = this.logManager.checkAndSetConfiguration(this.conf);  } else {      this.conf.setConf(this.options.getInitialConf());  }

這段程式碼主要是對快照、日誌、元數據等功能初始化。

this.replicatorGroup = new ReplicatorGroupImpl();  //收其他節點或者客戶端發過來的請求,轉交給對應服務處理  this.rpcService = new BoltRaftClientService(this.replicatorGroup);  final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();  rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));  rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());  rgOpts.setLogManager(this.logManager);  rgOpts.setBallotBox(this.ballotBox);  rgOpts.setNode(this);  rgOpts.setRaftRpcClientService(this.rpcService);  rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);  rgOpts.setRaftOptions(this.raftOptions);  rgOpts.setTimerManager(this.timerManager);    // Adds metric registry to RPC service.  this.options.setMetricRegistry(this.metrics.getMetricRegistry());  //初始化rpc服務  if (!this.rpcService.init(this.options)) {      LOG.error("Fail to init rpc service.");      return false;  }  this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);    this.readOnlyService = new ReadOnlyServiceImpl();  final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();  rosOpts.setFsmCaller(this.fsmCaller);  rosOpts.setNode(this);  rosOpts.setRaftOptions(this.raftOptions);  //只讀服務初始化  if (!this.readOnlyService.init(rosOpts)) {      LOG.error("Fail to init readOnlyService.");      return false;  }

這段程式碼主要是初始化replicatorGroup、rpcService以及readOnlyService。

接下來是最後一段的程式碼:

// set state to follower  this.state = State.STATE_FOLLOWER;    if (LOG.isInfoEnabled()) {      LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,              this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());  }    //如果快照執行器不為空,並且生成快照的時間間隔大於0,那麼就定時生成快照  if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {      LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);      this.snapshotTimer.start();  }    if (!this.conf.isEmpty()) {      //新啟動的node需要重新選舉      stepDown(this.currTerm, false, new Status());  }    if (!NodeManager.getInstance().add(this)) {      LOG.error("NodeManager add {} failed.", getNodeId());      return false;  }    // Now the raft node is started , have to acquire the writeLock to avoid race  // conditions  this.writeLock.lock();  //這個分支表示當前的jraft集群里只有一個節點,那麼個節點必定是leader直接進行選舉就好了  if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {      // The group contains only this server which must be the LEADER, trigger      // the timer immediately.      electSelf();  } else {      this.writeLock.unlock();  }    return true;

這段程式碼里會將當前的狀態設置為Follower,然後啟動快照定時器定時生成快照。
如果當前的集群不是單節點集群需要做一下stepDown,表示新生成的Node節點需要重新進行選舉。
最下面有一個if分支,如果當前的jraft集群里只有一個節點,那麼個節點必定是leader直接進行選舉就好了,所以會直接調用electSelf進行選舉。
選舉的程式碼我們就暫時略過,要不然後面就沒得講了。

到這裡整個NodeImpl實例的init方法就分析完了,這個方法很長,但是還是做了很多事情的。

好了,今天也不早了,各位晚安~