ZooKeeper客戶端與服務端的watcher回調源碼閱讀
- 2019 年 10 月 3 日
- 筆記
watcher存在的必要性
舉個特容易懂的例子: 假如我的項目是基於dubbo+zookeeper搭建的分散式項目, 我有三個功能相同的服務提供者,用zookeeper當成註冊中心,我的三個項目得註冊進zookeeper才能對外暴露服務,但是問題來了,寫的java程式碼怎麼才能註冊進zookeeper呢?當然加入依賴,寫好配置文件再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也註冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然後把地址快取在本地, 進而可以遠程調用服務消費者,那麼問題又來了,萬一哪一台服務提供者掛了,怎麼辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事
watcher的類型
keeperState | EventType | 觸發條件 | 說明 |
---|---|---|---|
SyncConnected | None(-1) | 客戶端與服務端建立連接 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeCreate(1) | watcher監聽的數據節點被創建 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeDeleted(2) | Watcher監聽的數據節點被刪除 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeDataChanged(3) | watcher監聽的node數據內容發生改變 | 客戶端與服務端處於連接狀態 |
SyncConnected | NodeChildrenChange(4) | 被監聽的數據節點的節點列表發生變更 | 客戶端與服務端處於連接狀態 |
Disconnect | None(-1) | 客戶端與服務端斷開連接 | 客戶端與服務端斷開連接 |
Expired (-112) | None(-1) | 會話超時 | session過期,收到異常SessionExpiredException |
AuthFailed | None(-1) | 1.使用了錯誤的scheme 2,SALS許可權驗證失敗了 | 收到異常AuthFailedException |
實驗場景:
假設我們已經成功啟動了zookeeper的服務端和客戶端,並且預先添加了watcher,然後使用控制台動態的修改下node的data,我們會發現watcher回調的現象
添加的鉤子函數程式碼如下:
public class ZookepperClientTest { public static void main(String[] args) throws Exception { ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.err.println("連接,觸發"); } }); Stat stat = new Stat(); // todo 下面添加的事件監聽器可是實現事件的消費訂閱 String content = new String(client.getData("/node1", new Watcher() { @Override public void process(WatchedEvent event) { // todo 任何連接上這個節點的客戶端修改了這個節點的 data數據,都會引起process函數的回調 // todo 特點1: watch只能使用1次 if (event.getType().equals(Event.EventType.NodeDataChanged)){ System.err.println("當前節點數據發生了改變"); } } }, stat));
看如上的程式碼, 添加了一個自己的watcher
也就是client.getData("/node1", new Watcher() {}
這是個回調的鉤子函數,執行時不會運行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了
getData做了哪些事情?
源碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化後調用的
主要主流如下幾個工作
- 創建
WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
- 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象
- 創建一個request,並且初始化這個
request.head=getData=4
- 調用ClientCnxn.submitRequest(…) , 將現存的這些資訊進一步封裝
- request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { // todo 校驗path final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { // todo DataWatchRegistration 繼承了 WatchRegistration // todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個對象 wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); // todo 創建一個請求頭 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); // todo 創建了一個GetDataRequest GetDataRequest request = new GetDataRequest(); // todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這裡面的條件就是 true request.setPath(serverPath); request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎麼辦的 GetDataResponse response = new GetDataResponse(); // todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的 // todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest() ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
的源碼我卸載下面, 這裡來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet
更重要的是 queuePacket()
方法的最後一個參數,存在我們剛剛創建的path+watcher的封裝類
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // todo 來到這個 queuePacket() 方法在下面, 這個方法就是將 用戶輸入-> string ->>> request ->>> packet 的過程 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // todo 使用同步程式碼塊,在下面的進行 同步阻塞等待, 直到有了Response響應才會跳出這個循環, 這個finished狀態就是在客戶端接受到服務端的 // todo 的響應後, 將服務端的響應解析出來,然後放置到 pendingqueue里時,設置上去的 synchronized (packet) { while (!packet.finished) { // todo 這個等待是需要喚醒的 packet.wait(); } } // todo 直到上面的程式碼塊被喚醒,才會這個方法才會返回 return r; }
同樣,在queuePacket()方法中將packet提交到outgoingQueue中,最終被seadThread消費發送到服務端
服務端如何處理watchRegistration不為空的packet
後續我準備用一整篇部落格詳解單機模式下服務端處理請求的流程,所以這篇部落格只說結論
在服務端,用戶的請求最終會按順序流向三個Processor
,分別是
- PrepRequestProcessor
- 負責進行一些狀態的修改
- SyncRequestProcessor
- 將事務日誌同步到磁碟
- FinalRequestProcessor
- 相應用戶的請求
我們直接去看FinalRequestProcessor
的public void processRequest(Request request) {}方法
,看他針對getData()
方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
跟進watcher的有無給服務端添加不同的Watcher
真的得劃重點了,當我發現這一點時,我的心情是超級激動的,就像發現了新大陸一樣
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); // todo 這裡的操作 getDataRequest.getWatch() ? cnxn : null 對應可客戶端的 跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關 // todo 跟進去 getData() byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); //todo cnxn的Processor()被回調, 往客戶端發送數據 , 什麼時候觸發呢? 就是上面的 處理事務時的回調 第127行 // todo 構建了一個 rsp ,在本類的最後面將rsp 響應給client rsp = new GetDataResponse(b, stat); break; }
繼續跟進這個getData()
在服務端維護了一份path+watcher的map
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { // todo 將path 和 watcher 綁定在一起 dataWatches.addWatch(path, watcher); } return n.data; } }
客戶端打開命令行,修改服務端node的狀態
書接上回,當客戶單的程式碼去創建ClientCnxn
時,有下面的邏輯 , 它開啟了兩條守護執行緒, sendThread負責向服務端發送心跳,已經和服務端進行用戶相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node
// todo start就是啟動了在構造方法中創建的執行緒 public void start() { sendThread.start(); eventThread.start(); }
到目前為止,客戶端就有如下三條執行緒了
- 負責處理用戶在控制台輸入命令的主執行緒
- 守護執行緒1: seadThread
- 守護執行緒2: eventThread
跟進主執行緒的處理用戶輸入部分的邏輯程式碼如下:
下面的程式碼的主要邏輯就是處理用戶輸入的命令,當通過if-else選擇分支判斷用戶到底輸入的啥命令
按照我們的假定的場景,用戶輸入的命令是這樣的 set /path newValue
所以,毫無疑問,經過解析後程式碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),
部分
// todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯 // todo 用大白話講,下面其實就是把 從控制台獲取的用戶的輸入資訊轉換成指定的字元, 然後發送到服務端 // todo MyCommandOptions 是處理命令行選項和shell腳本的工具類 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { // todo 在這個方法中可以看到很多的命令行所支援的命令 Stat stat = new Stat(); // todo 獲取命令行輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令 1 2 3 位置可能就是不同的參數 String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; List<ACL> acl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); if (cmd.equals("quit")) { System.out.println("Quitting..."); zk.close(); System.exit(0); } else if (cmd.equals("set") && args.length >= 3) { path = args[1]; stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); printStat(stat);
繼續跟進stat = zk.setData(path, args[2].getBytes(),
下面的邏輯也很簡單,就是將用戶的輸入封裝進來request中,通過ClientCnxn
類的submit方法提交到一個隊列中,等待著sendThread去消費
這次有目的的看一下submitRequest的最後一個參數為null, 這個參數是WatchRegistration的位置,一開始置為null
public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setData); SetDataRequest request = new SetDataRequest(); request.setPath(serverPath); request.setData(data); request.setVersion(version); SetDataResponse response = new SetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getStat(); }
跟進這個submitRequest()方法, 源碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應
當前程式碼的主要邏輯就是將request封裝進packet,然後將packet添加到ClintCnxn維護的outgoingQueue隊列中等待sendThread的消費
這次來到這個方法是因為我們在控制台輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher保存進maptable中
重要:發送事務消息
在FinalRequestProcessor
的public void processRequest(Request request) {}
方法中,有如下程式碼
//todo 請求頭不為空 if (request.hdr != null) { // 獲取請求頭 TxnHeader hdr = request.hdr; // 獲取事務 Record txn = request.txn; // todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這裡面有向客戶端發送事件的邏輯, 回調客戶端的watcher----!!!!!!--> rc = zks.processTxn(hdr, txn); }
繼續跟進去
// todo 處理事物日誌 public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); // todo 繼續跟進去!!!!!!!!! // todo 跟進 processTxn(hdr, txn) rc = getZKDatabase().processTxn(hdr, txn);
跟進ZkDatabase.java
中的processTxn(hdr, txn)方法
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { // todo 跟進 processTxn return dataTree.processTxn(hdr, txn); }
跟進到DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { // todo 根據客客戶端發送過來的type進行switch, case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); // todo 跟進這個創建節點的方法 createNode( createTxn.getPath(),
根據請求頭的值,進而判斷出走到那個switch的分支,當前我們在控制台觸發,進入到setData分支如下:跟進這個方法中可以看到它主要做了如下幾件事
- 使用傳遞進來的新值替代舊data
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
觸髮指定的事件watch,什麼事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去查看
//todo setData public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { // todo 修改記憶體的數據 lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } // todo 終於 看到了 服務端 關於觸發NodeDataChanged的事件 dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
補充Watch & EventType 類圖
跟進去dataWatches.triggerWatch(path, EventType.NodeDataChanged);
,源碼如下, 主要的邏輯就是取出存放在服務端的watch,然後逐個回調他們的processor函數,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時添加getData()時存進去的wather,也就是ServerCnxn
// todo 跟進去服務端的 觸發事件, 但是吧, 很納悶. 就是沒有往客戶端發送數據的邏輯 public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // todo 繼續跟進去, 看它如何回調的 w.process(e); } return watchers; }
懷著激動的心情去看看ServerCnxn的process()方法做了什麼事?
來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端發送事務型消息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的參數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記後,就會將這條響應交給EventThread處理
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); // todo 往服務端發送了 e event類型消息 sendResponse(h, e, "notification"); }
處理回調回調watch使用的響應
進入到SendThread
的讀就緒源碼部分,如下: 它根據header.xid=-1就知道了這是事務類型的響應
// todo 服務端拋出來的事件, 客戶端將把他存在EventThread的 watingEvents 隊列中 // todo 它的實現邏輯也是這樣, 會有另外一個執行緒不斷的消費這個隊列 if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } // todo 創建watcherEvent 並將服務端發送回來的數據,反序列化進這個對象中 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path // todo 將server path 反轉成 client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //todo 跟進去 eventThread.queueEvent(we); return; } }
在這個方法的最後,將這個相應添加進EventThread
消費的隊列中,跟進 eventThread.queueEvent(we);
// todo public void queueEvent(WatchedEvent event) { // todo 如果事件的類型是 none, 或者sessionState = 直接返回 /** * todo 事件的類型被設計成 watcher 介面的枚舉 * None (-1), * NodeCreated (1), * NodeDeleted (2), * NodeDataChanged (3), * NodeChildrenChanged (4); */ if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event // todo 根據事件的具體類型,將觀察者具體化, 跟進去 // todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起 WatcherSetEventPair pair = new WatcherSetEventPair( //todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing // todo 將watch集合 和 event 進行排隊(按順序添加到隊列里了), 以便後續處理 , 怎麼處理呢? 就在EventThread的run循環中消費 // todo watingEvent ==> LinkedBlockingQueue<Object> waitingEvents.add(pair); }
上面的程式碼主要做了如下幾件事:
- 從map中取出和當前事件相關的全部watcher
- 將watcher set 添加進 waitingEvents隊列中,等待EventThead的消費
跟進 watcher.materialize(event.getState(), event.getType(),
會追到下面的程式碼
case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支 case NodeCreated: synchronized (dataWatches) { // todo dataWatches 就是剛才存放 path : watcher 的map // todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中 addTo(dataWatches.remove(clientPath), result); }
上面的dataWatches 就是保存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什麼zk原生客戶端添加的watcher僅僅會回調一次
EventThread是如何消費waitingEvents的
EventThread是一條守護執行緒, 因此它擁有自己的不斷在運行的run方法,它就是在這個run方法中對這個隊列進行消費的
@Override public void run() { try { isRunning = true; // todo 同樣是無限的循環 while (true) { // todo 從watingEvnets 中取出一個 WatcherSetEventPair Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // todo 本類方法,處理這個事件,繼續進入,方法就在下面 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catc
繼續跟進它的processEvent(event)
,最終會在這個方法中調用下面的程式碼,這裡的watcher就是我在本篇部落格的開始位置添加進去的watcher,至此打完收工
watcher.process(pair.event);
總結:
當客戶端啟動時添加watcher對某一個特定path上的node進行監聽時 , 客戶端的watcher被封裝進WatcherRegistion中再進一步發送的服務端
watcher不為空的packet達到服務端後會被巧妙的處理,將ServerCnxn當成watcher註冊添加到服務端維護的那份watcher map table中
當watcher關聯的node發生了NodeCreate,NodeDeleted ,NodeDataChannged,NodeChildrenChannged時,在最後一個處理器就會觸發發送事務類型事件的動作,其實就是回調ServerCnxn的process()方法
事務類型的響應返回到客戶端,跟進xid區分出到底是哪種響應,如-1是NodeDataChanged,最終會把這個事務事件提交到EventThread消費的waitingEvents等待EventThread消費它,回調客戶端的watcher的process()方法
如果覺得對您有幫助,歡迎點個推薦, 如果有錯誤,歡迎指出