ZooKeeper單機服務端的啟動源碼閱讀
- 2019 年 10 月 3 日
- 筆記
程式的入口QuorumPeerMain
public static void main(String[] args) { // QuorumPeerMain main = new QuorumPeerMain(); try { // 初始化服務端,並運行服務端 // todo 跟進去看他如何處理 服務端的配置文件,以及根據服務端的配置文件做出來那些動作 main.initializeAndRun(args);
初始化和啟動總覽
跟進initializeAndRun()
方法 , 這個方法中主要做了如下三件事
- 從
args[0]
解析出配置文件的位置,創建QuorumPeerConfig
配置類對象(可以把這個對象理解成單個ZK server的配置對象),然後將配置文件中的內容載入進記憶體,並完成對java配置類的屬性的賦值 - 開啟,啟動並清除計劃任務的邏輯
- 根據從記憶體中讀取配置文件實例化好的配置類,啟動ZKserver
protected void initializeAndRun(String[] args) throws ConfigException, IOException { // todo 這個類是關聯配置文件的類, 我們在配置文件中輸入的各種配置都是他的屬性 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // todo config.parse(args[0]); } // Start and schedule the the purge task // todo 啟動並清除計劃任務 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); // todo config.servers.size() > 0 說明添加了關於集群的配置 if (args.length == 1 && config.servers.size() > 0) { // todo 根據配置啟動伺服器, 跟進去, 就在下面 runFromConfig(config); } else { // todo 沒添加集群的配置 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // todo 啟動單機 ZooKeeperServerMain.main(args); } }
讀取配置文件
下面跟進parse
, 這個方法的目的是將磁碟上的配置資訊讀取到文件中,完成對QuorumPeerConfig
的初始化主要做了如下兩件事
- 因為ZK的配置文件是
.properties
結尾的,因此呢選擇了Properties.java
(格式是 key=value)來解析讀取配置文件 parseProperties()
方法,對解析出來的配置文件進行進一步的處理
public void parse(String path) throws ConfigException { File configFile = new File(path); LOG.info("Reading configuration from: " + configFile); try { if (!configFile.exists()) { throw new IllegalArgumentException(configFile.toString() + " file is missing"); } Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // todo 使用 Properties 按行讀取出配置文件內容 cfg.load(in); } finally { in.close(); } // todo 將按行讀取處理出來的進行分隔處理, 對當前的配置類進行賦值 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } }
解析配置文件
看一看,他是如何處理已經被載入到記憶體的配置文件的,
- 首先看一下上圖中我截取的配置文件的截圖,可以看到通過下面的if-else分支語句將配置文件的中的資訊一對一的讀取出來,完成對當前配置類的初始化
- if (value.toLowerCase().equals("observer")) {..}這個分支就是判斷當前的配置文件是不是Observer的配置文件,比較推薦的observer的配置,就是添加一條配置寫
peerType=observer
,但是這是為了人們查看方便設計的,換句話說,一個普通的Follower的配置文件,即便是添加上了這條配置文件,它同樣不是observer,後續還會有進一步的檢驗,因為zk集群的配置文件大同小異,一開始即便是我們不添加這個配置,observer角色的server依然會成為observer,但是對於人們來說,就不用點開dataDir中的myid文件查看究竟當前的server是不是Observer了 else if (key.startsWith("server."))
標記著配置文件中有關集群的配置資訊開始了,它根據不同的配置資訊,將不同身份的server存放進兩個map中,就像下面那樣,如果是Observer類型的,就存放在observers
中,如果是Follower類型的就添加進servers
map中- 它這樣做是為了下一步實現ZAB協議,過半檢查. 而設計的, 什麼是過半檢查機制呢? 首先是集群中的server存在一半以上健康時,集群才可用
- 其次是,Leader發起的決議,需要有一半的Follower同意決議才能通過,注意這裡是Follower,而不是OBserver+Follower,因為OBserver不參加投票,因此在這個半數協議中,它不作數, 所以再看他現在的做法,就是創建過半檢查機制封裝類
QuorumVerifer
時,使用servers
的容量
- 合併servers和observers, 雖然後者不參加決議投票,但是它同樣需要提供服務
- 讀取myid文件,最終確定不同的server的身份劃分,哪個是myid配置文件呢? 它是我們在配置集群資訊時在dataDir中創建的, 裡面僅僅存放一個數據,這個數字不是亂寫的,對應的是配置文件的server.n中的n, 啟動時會讀取這個文件,拿到裡面的數據與 zoo.cfg 裡面的配置資訊比較從而判斷到底是那個server,只是一個標識作用。
public void parseProperties(Properties zkProp) throws IOException, ConfigException { int clientPort = 0; String clientPortAddress = null; for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { dataDir = value; } else if (key.equals("dataLogDir")) { dataLogDir = value; } else if (key.equals("clientPort")) { clientPort = Integer.parseInt(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) { . . . . } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) { // todo 這是推薦配置做法在 observer 的配置文件中配置上添加 peerType=observer //todo 但是如果給一台不是observer的機器加上了這個配置, 它也不會是observer. 在這個函數的最後會有校驗 peerType = LearnerType.OBSERVER; } else if (value.toLowerCase().equals("participant")) { peerType = LearnerType.PARTICIPANT; } else { throw new ConfigException("Unrecognised peertype: " + value); } . . . } else if (key.startsWith("server.")) { // todo 全部以server.開頭的配置全部放到了 servers int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); String parts[] = splitWithLeadingHostname(value); if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) { LOG.error(value + " does not have the form host:port or host:port:port " + " or host:por . . . // todo 不論是普通節點,還是觀察者節點,都是 QuorumServer, 只不過添加進到不同的容器 if (type == LearnerType.OBSERVER){ // todo 如果不是觀察者的話,就不會放在 servers, // todo server.1=localhost:2181:3887 // todo server.2=localhost:2182:3888 // todo server.3=localhost:2183:3889 // todo port是對外提供服務的埠 electionPort是用於選舉的port // todo 查看zk的數據一致性我們使用的埠是 port observers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type)); } else { // todo 其他的普通節點放在 servers servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type)); } . . . . /* * Default of quorum config is majority */ if(serverGroup.size() > 0){ if(servers.size() != serverGroup.size()) throw new ConfigException("Every server must be in exactly one group"); /* * The deafult weight of a server is 1 */ for(QuorumServer s : servers.values()){ if(!serverWeight.containsKey(s.id)) serverWeight.put(s.id, (long) 1); } /* * Set the quorumVerifier to be QuorumHierarchical */ quorumVerifier = new QuorumHierarchical(numGroups, serverWeight, serverGroup); } else { /* * The default QuorumVerifier is QuorumMaj */ // todo 默認的仲裁方式, 過半機制中,是不包含 observer 的數量的 LOG.info("Defaulting to majority quorums"); quorumVerifier = new QuorumMaj(servers.size()); } // Now add observers to servers, once the quorums have been // figured out // todo 最後還是將 Observers 添加進了 servers servers.putAll(observers); /** * todo 當時搭建偽集群時,在每一個節點的dataDir文件中都添加進去了一個 myid文件 * 分別在zk、zk2、zk3、的dataDir中新建myid文件, 寫入一個數字, 該數字表示這是第幾號server. * 該數字必須和zoo.cfg文件中的server.X中的X一一對應. * myid的值是zoo.cfg文件里定義的server.A項A的值, * Zookeeper 啟動時會讀取這個文件,拿到裡面的數據與 zoo.cfg 裡面的配置資訊比較從而判斷到底是那個server,只是一個標識作用。 * */ // todo 找到當前節點的dataDir 下面的 myid文件 File myIdFile = new File(dataDir, "myid"); if (!myIdFile.exists()) { throw new IllegalArgumentException(myIdFile.toString() + " file is missing"); } BufferedReader br = new BufferedReader(new FileReader(myIdFile)); String myIdString; try { // todo 讀取出myid裡面的內容 myIdString = br.readLine(); } finally { br.close(); } try { // todo myid文件中存到的數據就是 配置文件中server.N 中的 N這個數字 serverId = Long.parseLong(myIdString); MDC.put("myid", myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException("serverid " + myIdString + " is not a number"); } // todo 通過檢查上面的Observers map 中是否存在 serverId, 這個serverId其實就是myid, 對應上了後,就將它的 // Warn about inconsistent peer type LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER : LearnerType.PARTICIPANT; if (roleByServersList != peerType) { LOG.warn("Peer type from servers list (" + roleByServersList + ") doesn't match peerType (" + peerType + "). Defaulting to servers list."); peerType = roleByServersList; }
根據配置文件啟動ZKServer
在一開始的QuorumPeerMain.java
類中的Initializer()
方法中,存在如下的邏輯,判斷是單機版本啟動還是集群的啟動
if (args.length == 1 && config.servers.size() > 0) { // todo 根據配置啟動伺服器, 跟進去, 就在下面 runFromConfig(config); } else { // todo 沒添加集群的配置 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // todo 啟動單機 ZooKeeperServerMain.main(args); }
如果是單機版本的話,會進入else塊從此構建ZookeeperServerMain
對象, 可以把這個ZooKeeperServerMain
理解成一個輔助類,經過它,初始化並啟動一個ZooKeeperServer.java的對象
繼續跟進
public static void main(String[] args) { // todo 使用無參的構造方法實例化服務端, 單機模式 ZooKeeperServerMain main = new ZooKeeperServerMain(); try { // todo 跟進去看他如何解析配置文件 main.initializeAndRun(args);
繼續跟進
protected void initializeAndRun(String[] args) throws ConfigException, IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } // todo 這個配置類, 對應著單機模式的配置類 , 裡面的配置資訊很少 ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { // todo 單機版本 config.parse(args); } // todo 讀取配置,啟動單機節點 runFromConfig(config); }
啟動單節點
這次再進入這個方法,我們直接跳過它是如果從配置文件中讀取出配置資訊了,然後直接看它的啟動方法
runFromConfig方法
主要做了如下幾件事
- 創建
ZooKeeperServer
它是單機ZK服務端的實例
如下的ZooKeeperServer相關的屬性 private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; protected RequestProcessor firstProcessor 以及它可以構建DataTree
- 創建
ZooKeeperServerShutdownHandler
監控ZkServer關閉狀態的處理器 - 創建
FileTxnSnapLog
文件快照相關的工具類 - 給ZKServer綁定上
單位時間trickTime
(節點心跳交流的時間) - 初始化 ZKServer
處理事務,快照相關的工具類
- 創建上下文的工廠
- 通過工廠,啟動上下文
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call run() in this thread. // todo 請注意,當前執行緒不會做其他任何事情,因此我們只在當前執行緒中調用Run方法,而不是開啟新執行緒 // create a file logger url from the command line args // todo 根據命令中的args 創建一個logger文件 final ZooKeeperServer zkServer = new ZooKeeperServer(); // Registers shutdown handler which will be used to know the server error or shutdown state changes. // todo 註冊一個shutdown handler, 通過他了解server發生的error或者了解shutdown 狀態的更改 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // todo FileTxnSnapLog工具類, 與 文件快照相關 txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); txnLog.setServerStats(zkServer.serverStats()); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); // todo 創建Server上下文的工廠,工廠方法模式 // todo ServerCnxnFactory是個抽象類,他有不同是實現, NIO版本的 Netty版本的 cnxnFactory = ServerCnxnFactory.createFactory(); // todo 建立socket,默認是NIOServerCnxnFactory(是一個執行緒) cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // todo 跟進這個方法 cnxnFactory.startup(zkServer);
- 看一下如何創建處理事務,快照日誌相關的數據文件的邏輯,可以看到,直接去關聯我們配置的dataDir,snapDir,對應著日誌存儲的目錄已經快照存儲的目錄, 然後封裝進
FileSnap
和FileTxnLog
對象中
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); // todo 關聯上指定數據文件和日誌文件 // todo 給FileTxnSnapLog賦值 this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); if (!this.dataDir.exists()) { ... . . // todo 將這兩個文件封裝進 FileTxnLog 給當前類維護的兩種事務快照( TnxnSnap ) 賦值 txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir);
- 上下文工廠
如上圖,將ServerCnxnFactory.java
的繼承圖,不同的上下文工廠的實現可以創建出不同的上下文,通過這個圖可以看到,netty不僅支援傳統的NIO,還有一套Netty的實現,當前我選擇的是原生的實現NIOServerCnxnFactory的實現,那麼由他創建出來的就是NIOServerCnxn
啟動流程如下圖
上下文工廠實例化服務端的NIOSocket
在這個方法中創建了ZooKeeperThread
,這個類ZK中設計的執行緒類,幾乎全部的執行緒都由此類完成,當前方法中的做法是將創建的Thread賦值給了當前的類的引用,實際上約等於當前類就是執行緒類,還有需要注意的地方就是雖然進行了初始化,但是並沒有開啟
此處看到的就是java原生的NIO Socket編程, 當前執行緒類被設置成守護執行緒
Thread thread; @Override public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); // todo 把當前類作為執行緒 thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); //todo 所以這裡的這個執行緒是為了和JVM生命周期綁定,只剩下這個執行緒時已經沒有意義了,應該關閉掉。 thread.setDaemon(true); maxClientCnxns = maxcc; // todo 看到了NIO原生的程式碼,使用打開服務端的 Channel, 綁定埠,設置為非阻塞,註冊上感興趣的事件是 accept 連接事件 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
- 由上下文工廠實例化的
NIOServerCnxn
下面是它的屬性,可以看到其實這個上下文涵蓋的很全面,甚至服務端的ZK都被他維護著,
NIOServerCnxnFactory factory; final SocketChannel sock; protected final SelectionKey sk; boolean initialized; ByteBuffer lenBuffer = ByteBuffer.allocate(4); ByteBuffer incomingBuffer = lenBuffer; LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>(); int sessionTimeout; protected final ZooKeeperServer zkServer;
上下文工廠(ServerFactoryCnxn)啟動
看完了ZooKeeperServerMain
中runFromConfig
方法中的創建ZKServer,FileTxnSnapLog
等重要對象的邏輯,下面,上下文啟動, 直接點擊去查看這個方法,肯定直接進入ServerFactoryCnxn
,我們選擇的是它的實現了NIOServerCnxnFactory
public void runFromConfig(ServerConfig config) throws IOException { . . . cnxnFactory.startup(zkServer);
下面是NIOServerCnxnFactory
的實現,它做的第一件事就是開啟上面實例化的所說的執行緒類,這條執行緒的開啟標記著,服務端從此可以接收客戶端發送的請求了
這個方法還做了如下三件事
- 將ZooKeeperServer交給上下文維護
- 因為這個是啟動,所以從磁碟中完成數據的恢復
- 繼續運行
- 創建計時器
- 開啟計時器
- 開啟三條處理器
- 註冊JMX
- 修改運行的狀態
- 喚醒全部執行緒
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { // todo start(); ==> run() 開啟執行緒 start(); //todo 實現在上面, 到目前為止服務端已經可以接受客戶端的請求了 // todo 將ZKS 交給NIOServerCnxnFactory管理,意味著NIOServerCnxnFactory是目前來說,服務端功能最多的對象 setZooKeeperServer(zks); // todo 因為是服務端剛剛啟動,需要從從disk將數據恢復到記憶體 zks.startdata(); // todo 繼續跟進 zks.startup(); }
完成數據的恢復
跟進startData()方法
, 看到先創建ZKDatabase
,這個對象就是存在於記憶體中的對象,對磁碟中數據可視化描述
// todo 將數據載入進快取中 public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { // todo 如果沒初始化的話就初始化 zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { // todo 恢複數據 loadData(); } }
跟進創建ZKDataBase的邏輯, 最直觀的可以看見,這個DB維護了DataTree和SnapLog
public ZKDatabase(FileTxnSnapLog snapLog) { // todo 創建了DataTree 數據樹的空對象 dataTree = new DataTree(); sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); //todo 用初始化好了的存有關於系統事務日誌將snaplog初始化 this.snapLog = snapLog; }
loaddata()
public void loadData() throws IOException, InterruptedException { // todo zkDatabase 已經初始化了 if(zkDb.isInitialized()){ // todo zxid = 最近的一次znode的事務id setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { //todo zkDB 沒有初始化就使用 zkDb.loadDataBase() , 跟進去看, 他從快照中獲取數據 setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
- 繼續啟動
zks.startup();
它的源碼在下面,其中的計時器類也是一個執行緒類
// todo 繼續啟動, 服務端和客戶端建立連接後會保留一個session, 其中這個sessiion的生命周期倒計時就在下面的 createSessionTracker(); public synchronized void startup() { if (sessionTracker == null) { // todo 創建session計時器 createSessionTracker(); } // todo 開啟計時器 startSessionTracker(); // todo 設置請求處理器, zookeeper中存在不同的請求處理器, 就在下面 setupRequestProcessors(); //todo 是一個為應用程式、設備、系統等植入管理功能的框架。 //todo JMX可以跨越一系列異構作業系統平台、系統體系結構和網路傳輸協議,靈活的開發無縫集成的系統、網路和服務管理應用 registerJMX(); // todo 修改狀態 --> running setState(State.RUNNING); // todo 喚醒所有執行緒, 因為前面有一個執行緒等待處理器 睡了一秒 notifyAll(); }
設置請求處理器
著重看一下它的setupRequestProcessors()
添加請求處理器,單機模式下僅僅存在三個處理器,除了最後一個不是執行緒類之外,其他兩個都是執行緒類
- PrepRequestProcessor
- 校驗許可權
- 修改請求的狀態
- SyncRequestProcessor
- 將request持久化日誌文件
- 打快照
- FinalRequestProcessor
- 響應客戶端的請求
protected void setupRequestProcessors() { // todo 下面的三個處理器的第二個參數是在指定 下一個處理器是誰 RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); // todo 在服務端, 數據的處理 socket -> packet -> request -> queue // todo 然後由下面的requestprocessor 鏈 進行下一步處理request // todo 開啟新執行緒, 服務端接收的客戶端的請求都放在了 隊列中,用處理器非同步處理 ((SyncRequestProcessor)syncProcessor).start(); //todo 第一個處理器 , 下一個處理器是 syncProcessor 最後一個處理器 finalProcessor firstProcessor = new PrepRequestProcessor(this, syncProcessor); // todo 開啟新執行緒 服務端接收的客戶端的請求都放在了 隊列中,用處理器非同步處理 ((PrepRequestProcessor)firstProcessor).start(); }
重理思路
程式碼看到這裡,重新調整一下思路接著往下看,首先作為服務端我們看到了上面的NIOServerCnxnFactory.java
類中的開啟了本類維護的新執行緒,讓服務端有了接收新連接的能力
既然是執行緒類,就存有Run方法,ZK的設計思路就是在NIOServerCnxnFactory.java
的run()方法中檢測客戶端有感興趣的事件時,就進入DoIO()
從bytebuffer中將用戶的請求解析出來,然後交由最後面的三個處理器排隊處理
NIOServerCnxnFactory.java
的run方法部分程式碼如下
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 接收數據,這裡會間歇性的接收到客戶端ping NIOServerCnxn c = (NIOServerCnxn) k.attachment(); // todo 跟進去, 和客戶端的那一套很相似了 c.doIO(k); } else {
繼續跟進readPayload()
–>readRequest()
–>zkServer.processPacket(this, incomingBuffer)
, 如下是processPacket()
方法的部分源碼
else { // todo 將上面的資訊包裝成 request Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // todo 提交request, 其實就是提交給服務端的 process處理器進行處理 submitRequest(si); }
繼續跟進submitRequest()
,終於可以看到它嘗試將這個request交給第一個處理器處理,但是因為這是在伺服器啟動的過程中,服務端並不確定伺服器的第一個處理器執行緒到底有沒有開啟,因此它先驗證,甚至會等一秒,直到處理器執行緒完成了啟動的邏輯
// todo 交由伺服器做出request的處理動作 public void submitRequest(Request si) { // todo 如果 firstProcessor 不存在,就報錯了 if (firstProcessor == null) { synchronized (this) { try { while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); // todo 驗證合法性 boolean validpacket = Request.isValid(si.type); if (validpacket) { // todo request合法的化,交給firstProcessor (實際是PrepRequestProcessor)處理 跟進去 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); }
經過上面的閱讀,不難發現,最終來自於客戶端的request都將會流經服務端的三個處理器,下面就看看它們到底做了哪些事
PrepRequestProcessor(執行緒類)
因為他本身就是執行緒類,我們直接看他的run()
,最直接的可以看到,它將請求交給了pRequest(req)
處理
public void run() { try { while (true) { // todo 取出請求 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; //todo 處理請求 if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } // todo 著重看這裡, 跟進去 pRequest(request); }
下面跟進它的pRequest()
,下面是它的源碼,通過switch分支針對不同類型的請求做出不同的處理,下面用create類型的請求舉例
protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.hdr = null; request.txn = null; // todo 下面的不同類型的資訊, 對應這不同的處理器方式 try { switch (request.type) { case OpCode.create: // todo 創建每條記錄對應的bean , 現在還是空的, 在面的pRequest2Txn 完成賦值 CreateRequest createRequest = new CreateRequest(); // todo 跟進這個方法, 再從這個方法出來,往下運行,可以看到調用了下一個處理器 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; . . . request.zxid = zks.getZxid(); // todo 調用下一個處理器處理器請求 SyncRequestProcessor nextProcessor.processRequest(request);
總覽思路,現在當前的處理器進行狀態的相關處理,處理完之後移交給下一個處理器
跟進pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
依然是用create類型距離, 它在下面的方法中做了如下幾件事
- 因為create是事務類型的請求,它在一開始就給request構建了事務頭 txnHeader
- 將request中的屬性反序列化進
CreateRequest
類中 - 校驗一下許可權,檢查一下訪問時是否需要訪問許可權,如果需要,當前訪問者有沒有足夠的許可權
- 根據用戶想create新node而輸入的string,進行截取取出它的父級路徑,因為創建新節點時,需在修改父路徑對應節點的相關資訊
- 校驗父節點是否是臨時節點
- 修改父節點是屬性
- 更新zxid(創建znode事務id)
- childCount++
- 更新cversion(針對當前子節點修改的次數)
- 將這條記錄添加到
outstandingChanges
集合中
// todo 第二個參數位置上的 record 是上一步new 出來的空對象--> protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { // todo 使用request的相關屬性,創建出 事務Header request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { case OpCode.create: // todo 校驗session的情況 zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) // todo 反序列化 ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); // todo 獲取出request中的path String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('