ZK集群如何保證數據一致性源碼閱讀

  • 2019 年 10 月 3 日
  • 筆記

什麼是數據一致性?

只有當服務端的ZK存在多台時,才會出現數據一致性的問題, 服務端存在多台伺服器,他們被劃分成了不同的角色,只有一台Leader,多台Follower和多台Observer, 他們中的任意一台都能響應客戶端的讀請求,任意一台也都能接收寫請求, 不同的是,Follower和Observer接收到客戶端的寫請求後不能直接處理這個請求而是將這個請求轉發給Leader,由Leader發起原子廣播完成數據一致性

理論上ZK集群中的每一個節點的作用都是相同的,他們應該和單機時一樣,各個節點存放的數據保持一致才行

Leader接收到Follower轉發過來的寫請求後發起提議,要求每一個Follower都對這次寫請求進行投票Observer不參加投票,繼續響應client的讀請求),Follower收到請求後,如果認為可以執行寫操作,就發送給leader確認ack, 這裡存在一個過半機制,就是說,在Leader發起的這次請求中如果存在一半以上的Follower響應了ack,Leader就認為這次的寫操作通過了決議,向Follower發送commit,讓它們把最新的操作寫進自己的文件系統

還有新添加一台ZK伺服器到集群中,也面臨著數據一致性的問題,它需要去Leader中讀取同步數據

Zab協議(ZooKeeper Atomic Brocadcast)

什麼是Zab協議

Zab協議是一個分散式一致性演算法,讓ZK擁有了崩潰恢復和原子廣播的能力,進而保證集群中的數據一致性

ZK對Zab的協議的實現架構: 主備模型,任何Learner節點接收到非事務請求查詢本地快取然後返回,任何事務操作都需要轉發給Leader,由Leader發起決議,同集群中超過半數的Follower返回確認ack時,Leader進行廣播,要求全部節點提交事務

特性:

  • 保證在leader上提交的事務最終被所有的伺服器提交
  • 保證丟棄沒有經過半數檢驗的事務

Zab協議的作用

  • 使用一個單獨的進程,保持leader和Learner之間的socket通訊,閱讀源碼這個Thread就是learnerHandler,任何寫請求都將由Leader在集群中進行原子廣播事務
  • 保證了全部的變更序列在全局被順序引用,寫操作中都需要先check然後才能寫,比如我們向create /a/b 它在創建b時,會先檢查a存在否? 而且,事務性的request存在於隊列中,先進先出,保證了他們之間的順序

Zab協議原理

  • 選舉: 在Follower中選舉中一個Leader
  • 發現: Leader中會維護一個Follower的列表並與之通訊
  • 同步: Leader會把自己的數據同步給Follower, 做到多副本存儲,體現了CAP的A和P 高可用和分區容錯
  • 廣播: Leader接受Follower的事務Proposal,然後將這個事務性的proposal廣播給其他learner

Zab協議內容

當整個集群啟動過程中,或者當 Leader 伺服器出現網路中弄斷、崩潰退出或重啟等異常時,Zab協議就會 進入崩潰恢復模式,選舉產生新的Leader。

當選舉產生了新的 Leader,同時集群中有過半的機器與該 Leader 伺服器完成了狀態同步(即數據同步)之後,Zab協議就會退出崩潰恢復模式,進入消息廣播模式。

當Leader出現崩潰退出或者機器重啟,亦或是集群中不存在超過半數的伺服器與Leader保存正常通訊,Zab就會再一次進入崩潰恢復,發起新一輪Leader選舉並實現數據同步。同步完成後又會進入消息廣播模式,接收事務請求

參考部落格-簡書 -_Zy

源碼入口

單機版本還是集群版本的啟動流程中,前部分幾乎是相同的,一直到QuorumPeerMain.javainitializeAndRun()方法,單機模式下運行的是ZooKeeperServerMain.main(args);, 集群模式下,運行的是runFromConfig(config);

因此當前部落格從QuorumPeerMainrunFromConfig()開始

其中的QuorumPeer.java可以看成ZK集群中的每一個server實體,下面程式碼大部分篇幅是在當前server的屬性完成初始化

   // todo 集群啟動的邏輯      public void runFromConfig(QuorumPeerConfig config) throws IOException {        try {            ManagedUtil.registerLog4jMBeans();        } catch (JMException e) {            LOG.warn("Unable to register log4j JMX control", e);        }          LOG.info("Starting quorum peer");        try {              ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();              cnxnFactory.configure(config.getClientPortAddress(),                                  config.getMaxClientCnxns());              // todo new QuorumPeer()  可以理解成, 創建了集群中的一個server            quorumPeer = getQuorumPeer();              // todo 將配置文件中解析出來的文件原封不動的賦值給我們的new 的QuorumPeer            quorumPeer.setQuorumPeers(config.getServers());            quorumPeer.setTxnFactory(new FileTxnSnapLog(                    new File(config.getDataLogDir()),                    new File(config.getDataDir())));            quorumPeer.setElectionType(config.getElectionAlg());            quorumPeer.setMyid(config.getServerId());            quorumPeer.setTickTime(config.getTickTime());            quorumPeer.setInitLimit(config.getInitLimit());            quorumPeer.setSyncLimit(config.getSyncLimit());            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());            quorumPeer.setCnxnFactory(cnxnFactory);            quorumPeer.setQuorumVerifier(config.getQuorumVerifier());            quorumPeer.setClientPortAddress(config.getClientPortAddress());            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));            quorumPeer.setLearnerType(config.getPeerType());            quorumPeer.setSyncEnabled(config.getSyncEnabled());                // sets quorum sasl authentication configurations            quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);            if(quorumPeer.isQuorumSaslAuthEnabled()){                quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);                quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);                quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);                quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);                quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);            }              quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);            quorumPeer.initialize();              // todo 著重看這個方法            quorumPeer.start();              quorumPeer.join();

跟進quorumPeer.start()方法,源碼如下, 主要做了如下幾件事

  • 數據恢復
  • 經過上下文的工廠,啟動這個執行緒類,使當前的server擁有接受client請求的能力(但是RequestProcessor沒有初始化,因此它能接受request,卻不能處理request)
  • 選舉Leader,在這個過程中會在Follower中選舉出一個leader,確立好集群中的 Leader,Follower,Observer的三大角色
  • 啟動當前執行緒類QuorumPeer.java
@Override  public synchronized void start() {      // todo 從磁碟中載入數據到記憶體中      loadDataBase();        // todo 啟動上下文的這個工廠,他是個執行緒類, 接受客戶端的請求      cnxnFactory.start();        // todo 開啟leader的選舉工作      startLeaderElection();        // todo 確定伺服器的角色, 啟動的就是當前類的run方法在900行      super.start();  }

看一下QuorumPeer.java的run方法,部分源碼如下,邏輯很清楚通過了上面的角色的選舉之後,集群中各個節點的角色已經確定下來了,那擁有不同角色的節點就會進入下面程式碼中不同的case分支中

  • looking : 正在進行領導者的選舉
  • observer: 觀察者
  • leading : 集群的leader
  • following: 集群的Follower
 while (running) {                  switch (getPeerState()) {            case LOOKING:              LOG.info("LOOKING");                if (Boolean.getBoolean("readonlymode.enabled")) {                  LOG.info("Attempting to start ReadOnlyZooKeeperServer");                    // Create read-only server but don't start it immediately                  final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(                          logFactory, this,                          new ZooKeeperServer.BasicDataTreeBuilder(),                          this.zkDb);                    Thread roZkMgr = new Thread() {                      public void run() {                          try {                              // lower-bound grace period to 2 secs                              sleep(Math.max(2000, tickTime));                              if (ServerState.LOOKING.equals(getPeerState())) {                                  roZk.startup();                              }                          } catch (InterruptedException e) {                              LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");                          } catch (Exception e) {                              LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);                          }                      }                  };                  try {                      roZkMgr.start();                      setBCVote(null);                      setCurrentVote(makeLEStrategy().lookForLeader());                  } catch (Exception e) {                      LOG.warn("Unexpected exception",e);                      setPeerState(ServerState.LOOKING);                  } finally {                      roZkMgr.interrupt();                      roZk.shutdown();                  }              } else {                  try {                      setBCVote(null);                      setCurrentVote(makeLEStrategy().lookForLeader());                  } catch (Exception e) {                      LOG.warn("Unexpected exception", e);                      setPeerState(ServerState.LOOKING);                  }              }              break;          case OBSERVING:              try {                  LOG.info("OBSERVING");                  setObserver(makeObserver(logFactory));                  observer.observeLeader();              } catch (Exception e) {                  LOG.warn("Unexpected exception",e );              } finally {                  observer.shutdown();                  setObserver(null);                  setPeerState(ServerState.LOOKING);              }              break;          case FOLLOWING:              // todo server 當選follow角色              try {                  LOG.info("FOLLOWING");                  setFollower(makeFollower(logFactory));                  follower.followLeader();              } catch (Exception e) {                  LOG.warn("Unexpected exception",e);              } finally {                  follower.shutdown();                  setFollower(null);                  setPeerState(ServerState.LOOKING);              }              break;          case LEADING:              // todo 伺服器成功當選成leader              LOG.info("LEADING");              try {                  setLeader(makeLeader(logFactory));                  // todo 跟進lead                  leader.lead();                  setLeader(null);              } catch (Exception e) {                  LOG.warn("Unexpected exception",e);              } finally {                  if (leader != null) {                      leader.shutdown("Forcing shutdown");                      setLeader(null);                  }                  setPeerState(ServerState.LOOKING);              }              break;          }      }

下面看一下,server當選成不同的角色後,後幹了什麼

總覽Leader&Follower

當選成Leader

跟進源碼,上面程式碼片段中makeLeader() 由這個方法創建了一個Leader的封裝類

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {          // todo 跟進它的Leader 構造方法          return new Leader(this, new LeaderZooKeeperServer(logFactory,                  this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));      }

https://img2018.cnblogs.com/blog/1496926/201910/1496926-20191003102116391-766953407.png

這是LeaderZooKeeperServer的繼承圖,可以看到其實他繼承了單機模式下的ZKServer

調用leader.lead()方法,這個方法主要做了如下幾件事

  • 創建了StateSummary對象
    • 這個對象封裝了zxid以及currentEpoch, 其中zxid就是最後一次和znode相關的事務id,後者是當前的epoch 它有64位,高32位標記是第幾代Leader,後32位是當前代leader提交的事務次數,Follower只識別高版本的前32位為Leader
  • 針對每一個Learner都開啟了一條新的執行緒LearnerCnxAcceptor,這條執行緒負責Leader和Learner(Observer+Follower)之間的IO交流
  • LearnerCnxAcceptorrun()方法中,只要有新的連接來了,新開啟了一條新的執行緒,LearnerHander,由他負責Leader中接受每一個參議員的packet,以及監聽新連接的到來
  • leader啟動…
 void lead() throws IOException, InterruptedException {            zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);            try {              self.tick.set(0);              zk.loadData();              // todo  創建了  封裝有狀態比較邏輯的對象              leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());                // todo 創建一個新的執行緒,為了新的 followers 來連接              cnxAcceptor = new LearnerCnxAcceptor();              cnxAcceptor.start();                readyToStart = true;              // todo              long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());                zk.setZxid(ZxidUtils.makeZxid(epoch, 0));                synchronized(this){                  lastProposed = zk.getZxid();              }                newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),                      null, null);                  if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {                  LOG.info("NEWLEADER proposal has Zxid of "                          + Long.toHexString(newLeaderProposal.packet.getZxid()));              }                waitForEpochAck(self.getId(), leaderStateSummary);              self.setCurrentEpoch(epoch);                  try {                  waitForNewLeaderAck(self.getId(), zk.getZxid());              } catch (InterruptedException e) {                  shutdown("Waiting for a quorum of followers, only synced with sids: [ "                          + getSidSetString(newLeaderProposal.ackSet) + " ]");                  HashSet<Long> followerSet = new HashSet<Long>();                  for (LearnerHandler f : learners)                      followerSet.add(f.getSid());                    if (self.getQuorumVerifier().containsQuorum(followerSet)) {                      LOG.warn("Enough followers present. "                              + "Perhaps the initTicks need to be increased.");                  }                  Thread.sleep(self.tickTime);                  self.tick.incrementAndGet();                  return;              }                // todo 啟動server              startZkServer();                  String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");              if (initialZxid != null) {                  long zxid = Long.parseLong(initialZxid);                  zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);              }                if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {                  self.cnxnFactory.setZooKeeperServer(zk);              }                boolean tickSkip = true;                while (true) {                  Thread.sleep(self.tickTime / 2);                  if (!tickSkip) {                      self.tick.incrementAndGet();                  }                  HashSet<Long> syncedSet = new HashSet<Long>();                    // lock on the followers when we use it.                  syncedSet.add(self.getId());                    for (LearnerHandler f : getLearners()) {                      // Synced set is used to check we have a supporting quorum, so only                      // PARTICIPANT, not OBSERVER, learners should be used                      if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {                          syncedSet.add(f.getSid());                      }                      f.ping();                  }                    // check leader running status                  if (!this.isRunning()) {                      shutdown("Unexpected internal error");                      return;                  }                  if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {                  //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {                      // Lost quorum, shutdown                      shutdown("Not sufficient followers synced, only synced with sids: [ "                              + getSidSetString(syncedSet) + " ]");                      // make sure the order is the same!                      // the leader goes to looking                      return;                }                tickSkip = !tickSkip;              }          } finally {              zk.unregisterJMX(this);          }      }  

當選成Follower

通過上面的case分支進入FOLLOWING塊,進入followerLeader方法

下面的Follower.java中的程式碼的主要邏輯:

  • 和Leader建立起連接
  • registerWithLeader()註冊進Leader
  • syncWithLeader()從Leader中同步數據並完成啟動
  • while(true){...}中接受leader發送過來的packet,處理packet
void followLeader() throws InterruptedException {        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);      try {          // todo 找出LeaderServer          QuorumServer leaderServer = findLeader();          try {              // todo 和Leader建立連接              connectToLeader(leaderServer.addr, leaderServer.hostname);                // todo 註冊在leader上(會往leader上發送數據)              //todo 這個Epoch代表當前是第幾輪選舉leader, 這個值給leader使用,由leader從接收到的最大的epoch中選出最大的,然後統一所有learner中的epoch值              long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);              if (newEpoch < self.getAcceptedEpoch()) {                  LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)                          + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));                  throw new IOException("Error: Epoch of leader is lower");              }              // todo 從leader同步數據, 同時也是在這個方法中完成初始化啟動的              syncWithLeader(newEpochZxid);              QuorumPacket qp = new QuorumPacket();                // todo 在follower中開啟無線循環, 不停的接收服務端的pakcet,然後處理packet              while (this.isRunning()) {                  readPacket(qp);                  // todo (接受leader發送的提議)                  processPacket(qp);              }          } catch (Exception e) {              LOG.warn("Exception when following the leader", e);              try {                  sock.close();              } catch (IOException e1) {                  e1.printStackTrace();              }                // clear pending revalidations              pendingRevalidations.clear();          }      } finally {          zk.unregisterJMX((Learner)this);      }  }

Leader&Follower交互的細節流程

這部分的邏輯流程圖如下

https://img2018.cnblogs.com/blog/1496926/201910/1496926-20191003102115913-393961042.png

https://img2018.cnblogs.com/blog/1496926/201910/1496926-20191003102115108-1550619619.png

Follower同Leader之間建立起Socket長連接

  • 在Follower中源碼如下, 嘗試五次和Leader建立連接,重試五次後放棄
protected void connectToLeader(InetSocketAddress addr, String hostname)          throws IOException, ConnectException, InterruptedException {      sock = new Socket();      sock.setSoTimeout(self.tickTime * self.initLimit);      for (int tries = 0; tries < 5; tries++) {          try {              sock.connect(addr, self.tickTime * self.syncLimit);              sock.setTcpNoDelay(nodelay);              break;          } catch (IOException e) {              if (tries == 4) {                  LOG.error("Unexpected exception",e);                  throw e;              } else {                  LOG.warn("Unexpected exception, tries="+tries+                          ", connecting to " + addr,e);                  sock = new Socket();                  sock.setSoTimeout(self.tickTime * self.initLimit);              }          }          Thread.sleep(1000);      }        self.authLearner.authenticate(sock, hostname);        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(              sock.getInputStream()));      bufferedOutput = new BufferedOutputStream(sock.getOutputStream());      leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);  }   
  • 在Leader中等待建立連接, 每當向上面有客戶端請求和Leader建立連接,就在如下的run()邏輯中的LearnerHandler()為每一條新的連接開啟一條新的執行緒
Leader.java    @Override  public void run() {  try {      while (!stop) {          // todo 下面的主要邏輯就是,在當前執行緒中輪詢,只要有一條連接進來就單獨開啟一條執行緒(LearnerHandler)          try{              // todo 從serversocket中獲取連接              Socket s = ss.accept();              // start with the initLimit, once the ack is processed in LearnerHandler switch to the syncLimit              // todo  從initlimit開始,在learnerhandler中處理ack之後,切換到synclimit              s.setSoTimeout(self.tickTime * self.initLimit);              s.setTcpNoDelay(nodelay);// todo 禁用delay演算法                // todo 讀取socket中的數據              BufferedInputStream is = new BufferedInputStream(s.getInputStream());                // todo 創建處理所有leanner資訊的 handler,他也執行緒類              LearnerHandler fh = new LearnerHandler(s, is, Leader.this);                fh.start();

Follower向Leader發送註冊消息

protected long registerWithLeader(int pktType) throws IOException{      /*       * Send follower info, including last zxid and sid       */      long lastLoggedZxid = self.getLastLoggedZxid();      QuorumPacket qp = new QuorumPacket();      qp.setType(pktType);      qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));        /*       * Add sid to payload       */      LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);      ByteArrayOutputStream bsid = new ByteArrayOutputStream();      BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);      boa.writeRecord(li, "LearnerInfo");      qp.setData(bsid.toByteArray());      // todo 往leader發送數據      writePacket(qp, true);      readPacket(qp);        

LearnerHandler接收數據

下面的接受解析請求的邏輯,learner接收到Follower的註冊響應後首先是從請求中,將request解析出來, 然後驗證一下,如果不是Leader.FOLLOWERINFO 或者是Leader.Observer 類型的直接返回了,如果是接著往下處理

引出了epoch的概念,它全長64位,前32位代表的是第幾代Leader,因為網路或者其他原因,leader是可能掛掉的,Leader有屬於自己的一個epoch編號,從1,2..開始,一旦Leader掛了,從新選出來的Leader的epoch就會更新,肯定會比原來老leader的epoch值大, 後32位標記的就是當前leader發起的第幾次決議

看它是怎麼處理的,通過程式碼,它會選出所有的Follower中最大的epoch值,並且在此基礎上+1,作為最新的epoch值,當然這是Leader自己選出來的值,那Follower能不能同意這個值呢?,跟進leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);,它裡面使用了過半檢查機制,不滿足半數檢驗就會wait(), 那什麼時候喚醒呢? 其實只要集群中再有其他的Follower啟動,會重複執行以上的邏輯,再次來到這個方法進行半數檢驗,就有可能喚醒

if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {          waitingForNewEpoch = false;          self.setAcceptedEpoch(epoch);          connectingFollowers.notifyAll();      } else {          long start = Time.currentElapsedTime();          long cur = start;          long end = start + self.getInitLimit()*self.getTickTime();          while(waitingForNewEpoch && cur < end) {              connectingFollowers.wait(end - cur);              cur = Time.currentElapsedTime();          }          if (waitingForNewEpoch) {              throw new InterruptedException("Timeout while waiting for epoch from quorum");          }      }

再往後,leader向Follower發送確認ack,包含最新的epoch+zxid,告訴Follower以後它的事務就從這個zxid開始,這個ack的header= Leader.LEADERINFO

發送完成之後,leader開始等待Follower的響應的ack

public void run() {  try {      leader.addLearnerHandler(this);      tickOfNextAckDeadline = leader.self.tick.get()              + leader.self.initLimit + leader.self.syncLimit;        ia = BinaryInputArchive.getArchive(bufferedInput);      bufferedOutput = new BufferedOutputStream(sock.getOutputStream());      oa = BinaryOutputArchive.getArchive(bufferedOutput);        QuorumPacket qp = new QuorumPacket();      // todo 讀取follower發送過來的數據      ia.readRecord(qp, "packet");      // todo 第一次Follower發送的註冊請求的header = Leader.FOLLOWERINFO      // todo leader 遇到非FOLLOWERINFO的 和 OBSERVERINFO的消息直接返回      if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){          LOG.error("First packet " + qp.toString()                  + " is not FOLLOWERINFO or OBSERVERINFO!");          return;      }        .      .      .        //獲取出Follower中最後一次epoch      long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());      long peerLastZxid;      StateSummary ss = null;      long zxid = qp.getZxid();      // todo leader用當前方法從眾多follower中選出epoch值最大的(而且還會再最大的基礎上加1)      // todo this.getSid()指定的 learner 的myid      // todo this.getSid()指定的 learner 的lastAcceptedEpoch      long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);      .      .      .      } else {          byte ver[] = new byte[4];          ByteBuffer.wrap(ver).putInt(0x10000);          // todo leader接收到learner的數據之後,給learnner 發送LEADERINFO類型的響應          // todo 返回了最新的epoch          QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);          oa.writeRecord(newEpochPacket, "packet");          bufferedOutput.flush();          QuorumPacket ackEpochPacket = new QuorumPacket();          ia.readRecord(ackEpochPacket, "packet");          if (ackEpochPacket.getType() != Leader.ACKEPOCH) {              LOG.error(ackEpochPacket.toString()                      + " is not ACKEPOCH");              return;          }          ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());          ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());          // todo 等待learner的響應ack          leader.waitForEpochAck(this.getSid(), ss);      }

Follower接收leader的資訊,並發送響應

Follower獲取到leader的相應的資訊,解析出當前leader的 leaderProtocolVersion,然後給leader發送 header=Leader.ACKEPOCH的ack

    protected long registerWithLeader(int pktType) throws IOException{       .       .       .       .          readPacket(qp);          final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());          if (qp.getType() == Leader.LEADERINFO) {              // we are connected to a 1.0 server so accept the new epoch and read the next packet              leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();              byte epochBytes[] = new byte[4];              final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);              if (newEpoch > self.getAcceptedEpoch()) {                  wrappedEpochBytes.putInt((int)self.getCurrentEpoch());                  self.setAcceptedEpoch(newEpoch);              } else if (newEpoch == self.getAcceptedEpoch()) {                  // since we have already acked an epoch equal to the leaders, we cannot ack                  // again, but we still need to send our lastZxid to the leader so that we can                  // sync with it if it does assume leadership of the epoch.                  // the -1 indicates that this reply should not count as an ack for the new epoch                  wrappedEpochBytes.putInt(-1);              } else {                  throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());              }              QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);              writePacket(ackNewEpoch, true);              return ZxidUtils.makeZxid(newEpoch, 0);  

Leader接收到Follower的ack後,開始同步數據的邏輯

看一下,如果Follower中最後一次的事務id和leader中的事務id值相同的話,說明沒有數據可以同步

在看單機版本的ZKServer啟動時,可以發現,在FinalRequestProcessor中存在一個commitedlog集合,這個集合中的存放著已經被持久化了的request,它的作用就是為了給當前Follower同步數據使用,因為Follower可以通過Leader最近的一次快照快速回複數據,但是快照是不定時打一次的,這就有可能出現缺失數據,所以搞了個commitedlog

用法:

  • 查看當前Follower的zxid是不是處於commitedlog集合中,最大的和最下的zxid之間,在這之間的話就說明從當前的Follower的zxid到commitedlog中最大的zxid之間的request中,都需要執行一遍,這種方式就稱為Leader.DIFF,僅僅同步不一樣的
    • 第一點: 並沒有挨個發送同步的請求,而是把他們放到一個集合中,統一發送
    • QuorumPacket的類型是Leader.COMMIT, Follower接收到這個commit之後,直接會提交同步這個集合中的request,完成數據的同步操作
  • 查看Follower中最後一次的zxid比Leader中的最大的zxid事務id還大,不管37 21 直接要求Follower將超過Leader的部分trunc,說白了就是刪除掉

  • 如果Follower中最大的zxid比leader中最小的zxid還小,使用快照的同步方式

區別:

快照中數據序列化後,使用Socket發送到Follower

給Follower發送同步數據的命令是通過下面方法中的一條單獨的執行緒完成的

if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {              // Follower is already sync with us, send empty diff              LOG.info("leader and follower are in sync, zxid=0x{}",                      Long.toHexString(peerLastZxid));              packetToSend = Leader.DIFF;              zxidToSend = peerLastZxid;    } else if (proposals.size() != 0) {     .     .     .    if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {      LOG.debug("Sending proposals to follower");        // as we look through proposals, this variable keeps track of previous proposal Id.      // todo 當我們查看以前的建議時,這個變數存放的是之前最小的 建議id      long prevProposalZxid = minCommittedLog;        // Keep track of whether we are about to send the first packet.      // todo 跟蹤我們是否要發送第一個包      // Before sending the first packet, we have to tell the learner      //todo 在我們發送第一個包之前, 我們要告訴leanner是期待一個 trunc 還是一個 diff      // whether to expect a trunc or a diff      boolean firstPacket=true;        // If we are here, we can use committedLog to sync with follower. Then we only need to decide whether to send trunc or not      // todo 當我們執行到這裡了,我們使用 committedLog 來給Follower提供數據同步      packetToSend = Leader.DIFF;      zxidToSend = maxCommittedLog;        for (Proposal propose: proposals) {          // skip the proposals the peer already has          if (propose.packet.getZxid() <= peerLastZxid) {              prevProposalZxid = propose.packet.getZxid();              continue;          } else {              // If we are sending the first packet, figure out whether to trunc              // in case the follower has some proposals that the leader doesn't              // todo 當我們發送第一個packet時, 弄明白是否trunc, 以防leader沒有Follower擁有的proposals              if (firstPacket) {                  firstPacket = false;                  // Does the peer have some proposals that the leader hasn't seen yet                  if (prevProposalZxid < peerLastZxid) {                      // send a trunc message before sending the diff                      packetToSend = Leader.TRUNC;                      zxidToSend = prevProposalZxid;                      updates = zxidToSend;                  }              }              // todo 放入隊列(未發送)              queuePacket(propose.packet);              // todo 這一步就是leader給leanner發送的commit響應. leanner接收到這個響應之後無須在發送確認請求,直接同步數據              QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),                      null, null);              queuePacket(qcommit);          }      }  } else if (peerLastZxid > maxCommittedLog) {      // todo leanner最後一次提交的zxid 事務id比 leader中最大的事務id還大      LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",              Long.toHexString(maxCommittedLog),              Long.toHexString(updates));        packetToSend = Leader.TRUNC;      zxidToSend = maxCommittedLog;      updates = zxidToSend;  } else {      LOG.warn("Unhandled proposal scenario");  }  .  .         bufferedOutput.flush();              //Need to set the zxidToSend to the latest zxid              // todo 需要將zxidToSend 設置成最新的zxid              if (packetToSend == Leader.SNAP) {                  zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();              }              oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");              bufferedOutput.flush();                /* if we are not truncating or sending a diff just send a snapshot */              if (packetToSend == Leader.SNAP) {                  LOG.info("Sending snapshot last zxid of peer is 0x"                          + Long.toHexString(peerLastZxid) + " "                          + " zxid of leader is 0x"                          + Long.toHexString(leaderLastZxid)                          + "sent zxid of db as 0x"                          + Long.toHexString(zxidToSend));                  // Dump data to peer                  // todo 從快照中同步數據                  leader.zk.getZKDatabase().serializeSnapshot(oa);                  // todo 快照直接通過socket發送出去                  oa.writeString("BenWasHere", "signature");              }              bufferedOutput.flush();                // Start sending packets              //todo 創建一條新的執行緒,用這條執行緒發送上面存放到隊列裡面的數據              new Thread() {                  public void run() {                      Thread.currentThread().setName(                              "Sender-" + sock.getRemoteSocketAddress());                      try {                          sendPackets();                      } catch (InterruptedException e) {                          LOG.warn("Unexpected interruption",e);                      }                  }              }.start();              

Follower接受到Leader不同的同步數據命名,做出不同的動作

這個方法又是超級長的,好在也不會很難讀,根據不同的type選擇不同的數據恢復方法

  • 如果是Snap,則將自己的ZKDB清空,然後載入Leader的快照
  • 如果是trunc,就將不合法的zxid的記錄全部刪除,然後重新載入
  • 如果是diff類型的,會進一步進入到while (self.isRunning()) {..}循環塊的case模組,將需要同步的request全部添加到集合中packetsCommitted.add(qp.getZxid());,收到服務端的 UPTODATE`後才會跳出這個循環
  • 通過下面的程式碼查看,Follower並沒有先消費leader發送過來的request,因為它現在沒有完成啟動,沒法交給Processor處理,因此它需要先啟動,就在下面的zk.startup();完成啟動
  • 啟動之後,將這裡request載入到記憶體完成數據同步
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{      QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);      QuorumPacket qp = new QuorumPacket();      long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);      //todo 同步數據時,如果是diff這種情況, 我們不需要去生成一個快照,因為事務將在現有的快照的基礎上完成同步      //todo 如果是 snap 或者 trunc 時,需要生成快照      boolean snapshotNeeded = true;      // todo 從leader中讀取出一個 packet      readPacket(qp);      LinkedList<Long> packetsCommitted = new LinkedList<Long>();      LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();      synchronized (zk) {          // todo diff          if (qp.getType() == Leader.DIFF) {              LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));              // todo 修改了一下這個變數的值,這個變數的值在下面的程式碼中賦值給了 writeToTxnLog              snapshotNeeded = false;          }          else if (qp.getType() == Leader.SNAP) {              // todo 快照              LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));              // The leader is going to dump the database clear our own database and read              // todo 清空我們自己的ZKDB 使用leader發送的快照重建              zk.getZKDatabase().clear();              // todo leaderIs就是server發送過來的數據,進行反序列化              zk.getZKDatabase().deserializeSnapshot(leaderIs);                String signature = leaderIs.readString("signature");              if (!signature.equals("BenWasHere")) {                  LOG.error("Missing signature. Got " + signature);                  throw new IOException("Missing signature");              }              // todo 同步當前Follower中最大事務zxid              zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());            } else if (qp.getType() == Leader.TRUNC) {              //we need to truncate the log to the lastzxid of the leader              LOG.warn("Truncating log to get in sync with the leader 0x"                      + Long.toHexString(qp.getZxid()));              // TODO 刪除log數據              boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());              if (!truncated) {                  // not able to truncate the log                  LOG.error("Not able to truncate the log "                          + Long.toHexString(qp.getZxid()));                  System.exit(13);              }              zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());          }          else {              LOG.error("Got unexpected packet from leader "                      + qp.getType() + " exiting ... " );              System.exit(13);            }          zk.createSessionTracker();            long lastQueued = 0;            boolean isPreZAB1_0 = true;          // todo 如果不拍攝快照,請確保事務不應用於記憶體,而是寫入事務日誌          // todo diff模式下,snapshotNeeded=false          //todo  writeToTxnLog = true          boolean writeToTxnLog = !snapshotNeeded;            outerLoop:          while (self.isRunning()) {              // todo 在這個循環中繼續讀取數據, 如果是diff的話,就會讀取到下面拿到commit case              readPacket(qp);              switch(qp.getType()) {              case Leader.PROPOSAL:                  PacketInFlight pif = new PacketInFlight();                  pif.hdr = new TxnHeader();                  pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);                  if (pif.hdr.getZxid() != lastQueued + 1) {                  LOG.warn("Got zxid 0x"                          + Long.toHexString(pif.hdr.getZxid())                          + " expected 0x"                          + Long.toHexString(lastQueued + 1));                  }                  lastQueued = pif.hdr.getZxid();                  packetsNotCommitted.add(pif);                  break;              case Leader.COMMIT:                  if (!writeToTxnLog) { //todo  diff模式下 條件為false                      pif = packetsNotCommitted.peekFirst();                      if (pif.hdr.getZxid() != qp.getZxid()) {                          LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());                      } else {                          zk.processTxn(pif.hdr, pif.rec);                          packetsNotCommitted.remove();                      }                  } else {//todo 進入這個分支                      // todo 讀取到的qa 添加到packetsCommitted linkedList中 , 這個隊列在下面程式碼中使用                      packetsCommitted.add(qp.getZxid());                  }                  break;              case Leader.INFORM:                  /*                   * Only observer get this type of packet. We treat this                   * as receiving PROPOSAL and COMMMIT.                   */                  PacketInFlight packet = new PacketInFlight();                  packet.hdr = new TxnHeader();                  packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);                  // Log warning message if txn comes out-of-order                  if (packet.hdr.getZxid() != lastQueued + 1) {                      LOG.warn("Got zxid 0x"                              + Long.toHexString(packet.hdr.getZxid())                              + " expected 0x"                              + Long.toHexString(lastQueued + 1));                  }                  lastQueued = packet.hdr.getZxid();                  if (!writeToTxnLog) {                      // Apply to db directly if we haven't taken the snapshot                      zk.processTxn(packet.hdr, packet.rec);                  } else {                      packetsNotCommitted.add(packet);                      packetsCommitted.add(qp.getZxid());                  }                  break;              case Leader.UPTODATE:                  // todo 想讓下面的程式碼使用上面的隊列就得跳出這個while 循環                  // todo 這個while循環在當前case中完成跳出                  // todo 也就是說,只有獲取到Leader的uptoDate 請求時才來退出                  if (isPreZAB1_0) {                      zk.takeSnapshot();                      self.setCurrentEpoch(newEpoch);                  }                  self.cnxnFactory.setZooKeeperServer(zk);                  break outerLoop;              case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery                  // means this is Zab 1.0                  // Create updatingEpoch file and remove it after current                  // epoch is set. QuorumPeer.loadDataBase() uses this file to                  // detect the case where the server was terminated after                  // taking a snapshot but before setting the current epoch.                  File updating = new File(self.getTxnFactory().getSnapDir(),                                      QuorumPeer.UPDATING_EPOCH_FILENAME);                  if (!updating.exists() && !updating.createNewFile()) {                      throw new IOException("Failed to create " +                                            updating.toString());                  }                  if (snapshotNeeded) {                      zk.takeSnapshot();                  }                  self.setCurrentEpoch(newEpoch);                  if (!updating.delete()) {                      throw new IOException("Failed to delete " +                                            updating.toString());                  }                  writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory                  isPreZAB1_0 = false;                  writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);                  break;              }          }      }      ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));      writePacket(ack, true);      sock.setSoTimeout(self.tickTime * self.syncLimit);          // todo follower 完成初始化啟動, 在跟下去就很熟悉了, 和單機啟動流程神似      zk.startup();        self.updateElectionVote(newEpoch);          if (zk instanceof FollowerZooKeeperServer) {        } else if (zk instanceof ObserverZooKeeperServer) {         ///////////////////////////////////////////////////////////////////////////////          ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;          for (PacketInFlight p : packetsNotCommitted) {              Long zxid = packetsCommitted.peekFirst();              if (p.hdr.getZxid() != zxid) {                    LOG.warn("Committing " + Long.toHexString(zxid)                          + ", but next proposal is "                          + Long.toHexString(p.hdr.getZxid()));                  continue;              }              packetsCommitted.remove();              Request request = new Request(null, p.hdr.getClientId(),                      p.hdr.getCxid(), p.hdr.getType(), null, null);              request.txn = p.rec;              request.hdr = p.hdr;              ozk.commitRequest(request);          }     ///////////////////////////////////////////////////////////////////////////////      } else {          // New server type need to handle in-flight packets          throw new UnsupportedOperationException("Unknown server type");      }  }

再回想一下,現在的狀態就是完成了Follower.java中的方法followLeader()

現在的階段是,server啟動完成了,數據也和leader同步了,並且在下面的這個循環中可以和Leader一直保持IO交流

// todo 從leader同步數據, 同時也是在這個方法中完成初始化啟動的      syncWithLeader(newEpochZxid);      QuorumPacket qp = new QuorumPacket();        // todo 在follower中開啟無線循環, 不停的接收服務端的pakcet,然後處理packet      while (this.isRunning()) {          readPacket(qp);          // todo (接受leader發送的提議)          processPacket(qp);      }

Follower同步完數據,再跟Leader打交道就是 有客戶端有了寫請求,Follower需要將這個寫請求轉發leader進行廣播

Leader中就在下面的邏輯中進行處理,

learnerHandler.javarun()

      case Leader.REQUEST:                      // todo follower 接收到client的寫請求之後,進入到這個case分支                      bb = ByteBuffer.wrap(qp.getData());                      sessionId = bb.getLong();                      cxid = bb.getInt();                      type = bb.getInt();                      bb = bb.slice();                      Request si;                      if(type == OpCode.sync){                          si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());                      } else {                          si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());                      }                      si.setOwner(this);                      leader.zk.submitRequest(si);                      break;

它開啟了和Follower進行IO交流的執行緒之後,同樣會執行啟動的程式碼


總結: 在本篇部落格中,可以看到在Follower向Leader同步數據的過程中的幾個階段

  • 發現: leader發現Follower並與之建立通訊
  • 同步: Follower可以主要通過兩種方式完成和leader的數據同步工作
    • 通過Leader的快照
    • 通過leader的commitedLog中存放的包含snapshot的已經被持久化的request
  • 原子廣播: 這種情景是當Follower接收到客戶端的寫請求時,它會將這個請求轉發給Leader,因為要保證數據的一致性(源碼就在learnerHandler的run()方法的最後的while無限循環中CASE: Request)
    由Leader發起原子廣播,通知集群中的全部節點提交事務,完成數據一致性