
  • 2019 年 10 月 3 日
  • 筆記


舉個特容易懂的例子: 假如我的項目是基於dubbo+zookeeper搭建的分散式項目, 我有三個功能相同的服務提供者,用zookeeper當成註冊中心,我的三個項目得註冊進zookeeper才能對外暴露服務,但是問題來了,寫的java程式碼怎麼才能註冊進zookeeper呢?當然加入依賴,寫好配置文件再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也註冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然後把地址快取在本地, 進而可以遠程調用服務消費者,那麼問題又來了,萬一哪一台服務提供者掛了,怎麼辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事


keeperState EventType 觸發條件 說明
SyncConnected None(-1) 客戶端與服務端建立連接 客戶端與服務端處於連接狀態
SyncConnected NodeCreate(1) watcher監聽的數據節點被創建 客戶端與服務端處於連接狀態
SyncConnected NodeDeleted(2) Watcher監聽的數據節點被刪除 客戶端與服務端處於連接狀態
SyncConnected NodeDataChanged(3) watcher監聽的node數據內容發生改變 客戶端與服務端處於連接狀態
SyncConnected NodeChildrenChange(4) 被監聽的數據節點的節點列表發生變更 客戶端與服務端處於連接狀態
Disconnect None(-1) 客戶端與服務端斷開連接 客戶端與服務端斷開連接
Expired (-112) None(-1) 會話超時 session過期,收到異常SessionExpiredException
AuthFailed None(-1) 1.使用了錯誤的scheme 2,SALS許可權驗證失敗了 收到異常AuthFailedException




public class ZookepperClientTest {      public static void main(String[] args) throws Exception {          ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {              @Override              public void process(WatchedEvent event) {                  System.err.println("連接,觸發");              }          });        Stat stat = new Stat();         //   todo 下面添加的事件監聽器可是實現事件的消費訂閱        String content = new String(client.getData("/node1", new Watcher() {              @Override              public void process(WatchedEvent event) {                  // todo 任何連接上這個節點的客戶端修改了這個節點的 data數據,都會引起process函數的回調                    // todo 特點1:  watch只能使用1次                  if (event.getType().equals(Event.EventType.NodeDataChanged)){                      System.err.println("當前節點數據發生了改變");                  }              }          }, stat));

看如上的程式碼, 添加了一個自己的watcher也就是client.getData("/node1", new Watcher() {} 這是個回調的鉤子函數,執行時不會運行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了


源碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化後調用的


  • 創建WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
    • 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
  • 創建一個request,並且初始化這個request.head=getData=4
  • 調用ClientCnxn.submitRequest(…) , 將現存的這些資訊進一步封裝
  • request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
 public byte[] getData(final String path, Watcher watcher, Stat stat)          throws KeeperException, InterruptedException       {           // todo 校驗path          final String clientPath = path;          PathUtils.validatePath(clientPath);            // the watch contains the un-chroot path          WatchRegistration wcb = null;          if (watcher != null) {              // todo DataWatchRegistration 繼承了 WatchRegistration              // todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象              wcb = new DataWatchRegistration(watcher, clientPath);          }            final String serverPath = prependChroot(clientPath);          // todo 創建一個請求頭          RequestHeader h = new RequestHeader();          h.setType(ZooDefs.OpCode.getData);            // todo 創建了一個GetDataRequest          GetDataRequest request = new GetDataRequest();          // todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這裡面的條件就是  true          request.setPath(serverPath);          request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎麼辦的            GetDataResponse response = new GetDataResponse();            // todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的           // todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest()          ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);            if (r.getErr() != 0) {              throw KeeperException.create(KeeperException.Code.get(r.getErr()),                      clientPath);          }          if (stat != null) {              DataTree.copyStat(response.getStat(), stat);          }          return response.getData();      }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的源碼我卸載下面, 這裡來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet

更重要的是 queuePacket()方法的最後一個參數,存在我們剛剛創建的path+watcher的封裝類

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)          throws InterruptedException {      ReplyHeader r = new ReplyHeader();      // todo 來到這個 queuePacket() 方法在下面, 這個方法就是將  用戶輸入-> string ->>> request ->>> packet 的過程      Packet packet = queuePacket(h, r, request, response, null, null, null,              null, watchRegistration);          // todo 使用同步程式碼塊,在下面的進行    同步阻塞等待, 直到有了Response響應才會跳出這個循環, 這個finished狀態就是在客戶端接受到服務端的      // todo 的響應後, 將服務端的響應解析出來,然後放置到 pendingqueue里時,設置上去的      synchronized (packet) {          while (!packet.finished) {              // todo 這個等待是需要喚醒的              packet.wait();          }      }      // todo 直到上面的程式碼塊被喚醒,才會這個方法才會返回      return r;  }





  • PrepRequestProcessor
    • 負責進行一些狀態的修改
  • SyncRequestProcessor
    • 將事務日誌同步到磁碟
  • FinalRequestProcessor
    • 相應用戶的請求

我們直接去看FinalRequestProcessorpublic void processRequest(Request request) {}方法,看他針對getData()方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟進watcher的有無給服務端添加不同的Watcher


case OpCode.getData: {          lastOp = "GETD";          GetDataRequest getDataRequest = new GetDataRequest();          ByteBufferInputStream.byteBuffer2Record(request.request,                  getDataRequest);          DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());          if (n == null) {              throw new KeeperException.NoNodeException();          }          PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),                  ZooDefs.Perms.READ,                  request.authInfo);          Stat stat = new Stat();          // todo 這裡的操作    getDataRequest.getWatch() ? cnxn : null 對應可客戶端的  跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關          // todo 跟進去 getData()          byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,                  getDataRequest.getWatch() ? cnxn : null);          //todo  cnxn的Processor()被回調, 往客戶端發送數據 , 什麼時候觸發呢? 就是上面的  處理事務時的回調 第127行            // todo 構建了一個 rsp ,在本類的最後面將rsp 響應給client          rsp = new GetDataResponse(b, stat);          break;      }  


public byte[] getData(String path, Stat stat, Watcher watcher)          throws KeeperException.NoNodeException {      DataNode n = nodes.get(path);      if (n == null) {          throw new KeeperException.NoNodeException();      }      synchronized (n) {          n.copyStat(stat);          if (watcher != null) {              // todo 將path 和 watcher 綁定在一起              dataWatches.addWatch(path, watcher);          }          return n.data;      }  }  


書接上回,當客戶單的程式碼去創建ClientCnxn時,有下面的邏輯 , 它開啟了兩條守護執行緒, sendThread負責向服務端發送心跳,已經和服務端進行用戶相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node

    // todo start就是啟動了在構造方法中創建的執行緒      public void start() {          sendThread.start();          eventThread.start();      }


  • 負責處理用戶在控制台輸入命令的主執行緒
  • 守護執行緒1: seadThread
  • 守護執行緒2: eventThread



按照我們的假定的場景,用戶輸入的命令是這樣的 set /path newValue 所以,毫無疑問,經過解析後程式碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),部分

  // todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯      // todo  用大白話講,下面其實就是把 從控制台獲取的用戶的輸入資訊轉換成指定的字元, 然後發送到服務端      // todo MyCommandOptions 是處理命令行選項和shell腳本的工具類      protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {          // todo 在這個方法中可以看到很多的命令行所支援的命令          Stat stat = new Stat();          // todo 獲取命令行輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令  1 2 3 位置可能就是不同的參數          String[] args = co.getArgArray();          String cmd = co.getCommand();          if (args.length < 1) {              usage();              return false;          }            if (!commandMap.containsKey(cmd)) {              usage();              return false;          }            boolean watch = args.length > 2;          String path = null;          List<ACL> acl = Ids.OPEN_ACL_UNSAFE;          LOG.debug("Processing " + cmd);            if (cmd.equals("quit")) {              System.out.println("Quitting...");              zk.close();              System.exit(0);          } else if (cmd.equals("set") && args.length >= 3) {              path = args[1];              stat = zk.setData(path, args[2].getBytes(),                      args.length > 3 ? Integer.parseInt(args[3]) : -1);              printStat(stat);

繼續跟進stat = zk.setData(path, args[2].getBytes(), 下面的邏輯也很簡單,就是將用戶的輸入封裝進來request中,通過ClientCnxn類的submit方法提交到一個隊列中,等待著sendThread去消費

這次有目的的看一下submitRequest的最後一個參數為null, 這個參數是WatchRegistration的位置,一開始置為null

 public Stat setData(final String path, byte data[], int version)          throws KeeperException, InterruptedException      {          final String clientPath = path;          PathUtils.validatePath(clientPath);            final String serverPath = prependChroot(clientPath);            RequestHeader h = new RequestHeader();          h.setType(ZooDefs.OpCode.setData);          SetDataRequest request = new SetDataRequest();          request.setPath(serverPath);          request.setData(data);          request.setVersion(version);            SetDataResponse response = new SetDataResponse();          ReplyHeader r = cnxn.submitRequest(h, request, response, null);          if (r.getErr() != 0) {              throw KeeperException.create(KeeperException.Code.get(r.getErr()),                      clientPath);          }          return response.getStat();      }

跟進這個submitRequest()方法, 源碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應


這次來到這個方法是因為我們在控制台輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher保存進maptable中


FinalRequestProcessorpublic void processRequest(Request request) {}方法中,有如下程式碼

//todo 請求頭不為空      if (request.hdr != null) {          // 獲取請求頭         TxnHeader hdr = request.hdr;         // 獲取事務         Record txn = request.txn;          // todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這裡面有向客戶端發送事件的邏輯, 回調客戶端的watcher----!!!!!!-->         rc = zks.processTxn(hdr, txn);      }


// todo 處理事物日誌  public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {      ProcessTxnResult rc;      int opCode = hdr.getType();      long sessionId = hdr.getClientId();      // todo 繼續跟進去!!!!!!!!!      // todo 跟進 processTxn(hdr, txn)      rc = getZKDatabase().processTxn(hdr, txn);

跟進ZkDatabase.java中的processTxn(hdr, txn)方法

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {      // todo 跟進 processTxn      return dataTree.processTxn(hdr, txn);  }


  public ProcessTxnResult processTxn(TxnHeader header, Record txn)      {          ProcessTxnResult rc = new ProcessTxnResult();            try {              rc.clientId = header.getClientId();              rc.cxid = header.getCxid();              rc.zxid = header.getZxid();              rc.type = header.getType();              rc.err = 0;              rc.multiResult = null;              switch (header.getType()) { // todo 根據客客戶端發送過來的type進行switch,                  case OpCode.create:                      CreateTxn createTxn = (CreateTxn) txn;                      rc.path = createTxn.getPath();                      // todo  跟進這個創建節點的方法                      createNode(                              createTxn.getPath(),


  • 使用傳遞進來的新值替代舊data
  • dataWatches.triggerWatch(path, EventType.NodeDataChanged);觸髮指定的事件watch,什麼事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去查看
        //todo  setData      public Stat setData(String path, byte data[], int version, long zxid,              long time) throws KeeperException.NoNodeException {          Stat s = new Stat();          DataNode n = nodes.get(path);          if (n == null) {              throw new KeeperException.NoNodeException();          }          byte lastdata[] = null;          synchronized (n) {              // todo 修改記憶體的數據              lastdata = n.data;              n.data = data;              n.stat.setMtime(time);              n.stat.setMzxid(zxid);              n.stat.setVersion(version);              n.copyStat(s);          }          // now update if the path is in a quota subtree.          String lastPrefix;          if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {            this.updateBytes(lastPrefix, (data == null ? 0 : data.length)                - (lastdata == null ? 0 : lastdata.length));          }          // todo 終於 看到了   服務端 關於觸發NodeDataChanged的事件          dataWatches.triggerWatch(path, EventType.NodeDataChanged);          return s;      }

補充Watch & EventType 類圖


跟進去dataWatches.triggerWatch(path, EventType.NodeDataChanged);,源碼如下, 主要的邏輯就是取出存放在服務端的watch,然後逐個回調他們的processor函數,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時添加getData()時存進去的wather,也就是ServerCnxn

   // todo 跟進去服務端的 觸發事件,  但是吧, 很納悶. 就是沒有往客戶端發送數據的邏輯      public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {          WatchedEvent e = new WatchedEvent(type,                  KeeperState.SyncConnected, path);          HashSet<Watcher> watchers;          synchronized (this) {              watchers = watchTable.remove(path);              if (watchers == null || watchers.isEmpty()) {                  if (LOG.isTraceEnabled()) {                      ZooTrace.logTraceMessage(LOG,                              ZooTrace.EVENT_DELIVERY_TRACE_MASK,                              "No watchers for " + path);                  }                  return null;              }              for (Watcher w : watchers) {                  HashSet<String> paths = watch2Paths.get(w);                  if (paths != null) {                      paths.remove(path);                  }              }          }          for (Watcher w : watchers) {              if (supress != null && supress.contains(w)) {                  continue;              }              // todo 繼續跟進去, 看它如何回調的              w.process(e);          }          return watchers;      }


來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端發送事務型消息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的參數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記後,就會將這條響應交給EventThread處理

    @Override      synchronized public void process(WatchedEvent event) {          ReplyHeader h = new ReplyHeader(-1, -1L, 0);          if (LOG.isTraceEnabled()) {              ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,                                       "Deliver event " + event + " to 0x"                                       + Long.toHexString(this.sessionId)                                       + " through " + this);          }            // Convert WatchedEvent to a type that can be sent over the wire          WatcherEvent e = event.getWrapper();          // todo  往服務端發送了 e event類型消息          sendResponse(h, e, "notification");      }


進入到SendThread的讀就緒源碼部分,如下: 它根據header.xid=-1就知道了這是事務類型的響應

// todo 服務端拋出來的事件, 客戶端將把他存在EventThread的 watingEvents 隊列中  // todo 它的實現邏輯也是這樣, 會有另外一個執行緒不斷的消費這個隊列  if (replyHdr.getXid() == -1) {      // -1 means notification      if (LOG.isDebugEnabled()) {          LOG.debug("Got notification sessionid:0x"                  + Long.toHexString(sessionId));      }      // todo 創建watcherEvent 並將服務端發送回來的數據,反序列化進這個對象中      WatcherEvent event = new WatcherEvent();      event.deserialize(bbia, "response");        // convert from a server path to a client path      // todo 將server path 反轉成 client path      if (chrootPath != null) {          String serverPath = event.getPath();          if (serverPath.compareTo(chrootPath) == 0)              event.setPath("/");          else if (serverPath.length() > chrootPath.length())              event.setPath(serverPath.substring(chrootPath.length()));          else {              LOG.warn("Got server path " + event.getPath()                      + " which is too short for chroot path "                      + chrootPath);          }                WatchedEvent we = new WatchedEvent(event);                  if (LOG.isDebugEnabled()) {                      LOG.debug("Got " + we + " for sessionid 0x"                              + Long.toHexString(sessionId));                  }                  //todo 跟進去                  eventThread.queueEvent(we);                  return;              }      }  

在這個方法的最後,將這個相應添加進EventThread消費的隊列中,跟進 eventThread.queueEvent(we);

// todo  public void queueEvent(WatchedEvent event) {      // todo 如果事件的類型是 none, 或者sessionState =  直接返回      /**       *   todo 事件的類型被設計成 watcher 介面的枚舉       *   None (-1),       *   NodeCreated (1),       *   NodeDeleted (2),       *   NodeDataChanged (3),       *   NodeChildrenChanged (4);       */      if (event.getType() == EventType.None              && sessionState == event.getState()) {          return;      }      sessionState = event.getState();        // materialize the watchers based on the event      // todo 根據事件的具體類型,將觀察者具體化, 跟進去      // todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起      WatcherSetEventPair pair = new WatcherSetEventPair(              //todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set                watcher.materialize(event.getState(), event.getType(),                      event.getPath()),              event);      // queue the pair (watch set & event) for later processing      // todo 將watch集合 和 event 進行排隊(按順序添加到隊列里了), 以便後續處理 , 怎麼處理呢?  就在EventThread的run循環中消費      // todo watingEvent ==>  LinkedBlockingQueue<Object>      waitingEvents.add(pair);  }


  • 從map中取出和當前事件相關的全部watcher
  • 將watcher set 添加進 waitingEvents隊列中,等待EventThead的消費

跟進 watcher.materialize(event.getState(), event.getType(), 會追到下面的程式碼

case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支          case NodeCreated:              synchronized (dataWatches) {                  // todo dataWatches 就是剛才存放  path : watcher 的map                  // todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中                  addTo(dataWatches.remove(clientPath), result);              }

上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什麼zk原生客戶端添加的watcher僅僅會回調一次


EventThread是一條守護執行緒, 因此它擁有自己的不斷在運行的run方法,它就是在這個run方法中對這個隊列進行消費的

          @Override          public void run() {              try {                  isRunning = true;                  // todo 同樣是無限的循環                  while (true) {                      // todo 從watingEvnets 中取出一個 WatcherSetEventPair                      Object event = waitingEvents.take();                      if (event == eventOfDeath) {                          wasKilled = true;                      } else {                          // todo 本類方法,處理這個事件,繼續進入,方法就在下面                          processEvent(event);                      }                      if (wasKilled)                          synchronized (waitingEvents) {                              if (waitingEvents.isEmpty()) {                                  isRunning = false;                                  break;                              }                          }                  }              } catc          




當客戶端啟動時添加watcher對某一個特定path上的node進行監聽時 , 客戶端的watcher被封裝進WatcherRegistion中再進一步發送的服務端

watcher不為空的packet達到服務端後會被巧妙的處理,將ServerCnxn當成watcher註冊添加到服務端維護的那份watcher map table中

當watcher關聯的node發生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged時,在最後一個處理器就會觸發發送事務類型事件的動作,其實就是回調ServerCnxn的process()方法


如果覺得對您有幫助,歡迎點個推薦, 如果有錯誤,歡迎指出