­

分佈式協調組件Zookeeper之 選舉機制與ZAB協議

Zookeeper簡介:

Zookeeper是什麼:

  Zookeeper 是⼀個分佈式協調服務的開源框架。 主要⽤來解決分佈式集群中應⽤系統的⼀致性問題, 例如怎樣避免同時操作同⼀數據造成臟讀的問題。分佈式系統中數據存在⼀致性的問題!!

  • ZooKeeper 本質上是⼀個分佈式的⼩⽂件存儲系統。 提供基於類似於⽂件系統的⽬錄樹⽅式的數 據存儲,並且可以對樹中的節點進⾏有效管理。
  • ZooKeeper 提供給客戶端監控存儲在zk內部數據的功能,從⽽可以達到基於數據的集群管理。 諸 如: 統⼀命名服務(dubbo)、分佈式配置管理(solr的配置集中管理)、分佈式消息隊列 (sub/pub)、分佈式鎖、分佈式協調等功能。

架構組成:

 

 Leader

  • Zookeeper 集群⼯作的核⼼⻆⾊
  • 集群內部各個服務器的調度者。
  • 事務請求(寫操作) 的唯⼀調度和處理者,保證集群事務處理的順序性;對於 create, setData, delete 等有寫操作的請求,則需要統⼀轉發給leader 處理, leader 需要決定編號、執 ⾏操作,這個過程稱為⼀個事務。

Follower

  • 處理客戶端⾮事務(讀操作) 請求,
  • 轉發事務請求給 Leader;
  • 參與集群 Leader 選舉投票 2n-1台可以做集群投票。

此外,針對訪問量⽐較⼤的 zookeeper 集群, 還可新增觀察者⻆⾊。

Observer

  • 觀察者⻆⾊,觀察 Zookeeper 集群的最新狀態變化並將這些狀態同步過來,其對於⾮事務請求可 以進⾏獨⽴處理,對於事務請求,則會轉發給 Leader服務器進⾏處理。
  • 不會參與任何形式的投票只提供⾮事務服務,通常⽤於在不影響集群事務處理能⼒的前提下提升集 群的⾮事務處理能⼒。增加了集群增加並發的讀請求

ZK也是Master/slave架構,但是與之前不同的是zk集群中的Leader不是指定⽽來,⽽是通過選舉產⽣。

Zookeeper 特點:

  • 1.Zookeeper:⼀個領導者(leader:⽼⼤),多個跟隨者(follower:⼩弟)組成的集群。
  • 2. Leader負責進⾏投票的發起和決議,更新系統狀態(內部原理)
  • 3. Follower⽤於接收客戶請求並向客戶端返回結果,在選舉Leader過程中參與投票
  • 4. 集群中只要有半數以上節點存活,Zookeeper集群就能正常服務。
  • 5. 全局數據⼀致:每個server保存⼀份相同的數據副本,Client⽆論連接到哪個server,數據都是⼀ 致的。
  • 6. 更新請求順序進⾏(內部原理)
  • 7. 數據更新原⼦性,⼀次數據更新要麼成功,要麼失敗

Zookeeper 節點 之 ZNode 類型:

Zookeeper 節點類型可以分為三⼤類:

  • 持久性節點(Persistent)
  • 臨時性節點(Ephemeral)
  • 順序性節點(Sequential)

在開發中在創建節點的時候通過組合可以⽣成以下四種節點類型:持久節點、持久順序節點、臨時節 點、臨時順序節點。不同類型的節點則會有不同的⽣命周期

持久節點:是Zookeeper中最常⻅的⼀種節點類型,所謂持久節點,就是指節點被創建後會⼀直存在服 務器,直到刪除操作主動清除

持久順序節點:就是有順序的持久節點,節點特性和持久節點是⼀樣的,只是額外特性表現在順序上。 順序特性實質是在創建節點的時候,會在節點名後⾯加上⼀個數字後綴,來表示其順序。

臨時節點:就是會被⾃動清理掉的節點,它的⽣命周期和客戶端會話綁在⼀起,客戶端會話結束,節點 會被刪除掉。與持久性節點不同的是,臨時節點不能創建⼦節點。

臨時順序節點:就是有順序的臨時節點,和持久順序節點相同,在其創建的時候會在名字後⾯加上數字 後綴

事務ID:

  在ZooKeeper中,事務是指能夠改變ZooKeeper服務器狀態的操作,我們也稱之為事務操作或更新 操作,⼀般包括數據節點創建與刪除、數據節點內容更新等操作。對於每⼀個事務請求,ZooKeeper都 會為其分配⼀個全局唯⼀的事務ID,⽤ ZXID 來表示,通常是⼀個 64 位的數字。每⼀個 ZXID 對應⼀次 更新操作,從這些ZXID中可以間接地識別出ZooKeeper處理這些更新操作請求的全局順序 zk中的事務指的是對zk服務器狀態改變的操作(create,update data,更新位元組點);zk對這些事務操作都 會編號,這個編號是⾃增⻓的被稱為ZXID。

ZNode 的狀態信息:

#使⽤bin/zkCli.sh 連接到zk集群
[zk: localhost:2181(CONNECTED) 2] get /zookeeper
cZxid = 0x0
ctime = Wed Dec 31 19:00:00 EST 1969
mZxid = 0x0
mtime = Wed Dec 31 19:00:00 EST 1969
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

含義:

 cZxid 就是 Create ZXID,表示節點被創建時的事務ID。
 ctime 就是 Create Time,表示節點創建時間。
 mZxid 就是 Modified ZXID,表示節點最後⼀次被修改時的事務ID。
 mtime 就是 Modified Time,表示節點最後⼀次被修改的時間。
 pZxid 表示該節點的⼦節點列表最後⼀次被修改時的事務 ID。只有⼦節點列表變更才會更新 pZxid,
⼦節點內容變更不會更新。
 cversion 表示⼦節點的版本號。
 dataVersion 表示內容版本號。
 aclVersion 標識acl版本
 ephemeralOwner 表示創建該臨時節點時的會話 sessionID,如果是持久性節點那麼值為 0
 dataLength 表示數據⻓度。
 numChildren 表示直系⼦節點數。

Watcher 機制:

  在 ZooKeeper 中,引⼊了 Watcher 機制來實現這種分佈式的通知功能。ZooKeeper 允許客戶端向服務 端註冊⼀個 Watcher 監聽,當服務端的⼀些指定事件觸發了這個 Watcher,那麼Zk就會向指定客戶端 發送⼀個事件通知來實現分佈式的通知功能。

 Zookeeper的Watcher機制主要包括客戶端線程、客戶端WatcherManager、Zookeeper服務器三部 分。

具體⼯作流程為:

  • 客戶端在向Zookeeper服務器註冊的同時,會將Watcher對象存儲在客戶端的WatcherManager當 中
  • 當Zookeeper服務器觸發Watcher事件後,會向客戶端發送通知
  • 客戶端線程從WatcherManager中取出對應的Watcher對象來執⾏回調邏輯

註: 客戶端負責watch的註冊 和回調,zk服務器負責處理watch。

命令行使用:

   創建順序節點:create -s /zk-test 123
  創建臨時節點:create -e /zk-temp 123
  創建永久節點:create /zk-permanent 123
  讀取節點:ls path   其中,path表示的是指定數據節點的節點路徑
  獲取內容:get path
  更新節點:set path data
  刪除節點: delete path

JAVA 操作Zookeeper:

<dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.4.14</version>
</dependency> 
<dependency>
 <groupId>com.101tec</groupId>
 <artifactId>zkclient</artifactId>
 <version>0.2</version>
 </dependency>

創建會話:

package com.hust.grid.leesf.zkclient.examples;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
/*
 創建⼀個zkClient實例來進⾏連接
 */
 public static void main(String[] args) {
   ZkClient zkClient = new ZkClient("127.0.0.1:2181");
   System.out.println("ZooKeeper session created.");
 }
}

創建節點:

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Create_Node_Sample {
 public static void main(String[] args) {
 ZkClient zkClient = new ZkClient("127.0.0.1:2181");
 System.out.println("ZooKeeper session established.");
 //createParents的值設置為true,可以遞歸創建節點
 zkClient.createPersistent("/lg-zkClient/lg-c1",true);
 System.out.println("success create znode.");
 }
}

刪除節點:

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
 public static void main(String[] args) throws Exception {
 String path = "/lg-zkClient/lg-c1";
 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
 zkClient.deleteRecursive(path);
 System.out.println("success delete znode.");
 }
}

監聽節點變化:

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import java.util.List;
/*
演示zkClient如何使⽤監聽器
*/
public class Get_Child_Change {
 public static void main(String[] args) throws InterruptedException {
 //獲取到zkClient
 final ZkClient zkClient = new ZkClient("linux121:2181");
 //zkClient對指定⽬錄進⾏監聽(不存在⽬錄:/lg-client),指定收到通知之後的邏輯
 //對/lag-client註冊了監聽器,監聽器是⼀直監聽
 zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() {
 //該⽅法是接收到通知之後的執⾏邏輯定義
 public void handleChildChange(String path, List<String> childs)
throws Exception {
 //打印節點信息
 System.out.println(path + " childs changes ,current childs " +
childs);
 }
 });
 //使⽤zkClient創建節點,刪除節點,驗證監聽器是否運⾏
 zkClient.createPersistent("/lg-client");
 Thread.sleep(1000); //只是為了⽅便觀察結果數據
 zkClient.createPersistent("/lg-client/c1");
 Thread.sleep(1000);
 zkClient.delete("/lg-client/c1");
 Thread.sleep(1000);
 zkClient.delete("/lg-client");
 Thread.sleep(Integer.MAX_VALUE);
 /*
 1 監聽器可以對不存在的⽬錄進⾏監聽
 2 監聽⽬錄下⼦節點發⽣改變,可以接收到通知,攜帶數據有⼦節點列表
 3 監聽⽬錄創建和刪除本身也會被監聽到
 */
 }
}

執行結果:

/lg-zkClient 's child changed, currentChilds:[]
/lg-zkClient 's child changed, currentChilds:[c1]
/lg-zkClient 's child changed, currentChilds:[]
/lg-zkClient 's child changed, currentChilds:null

註:

  • 客戶端可以對⼀個不存在的節點進⾏⼦節點變更的監聽。
  • ⼀旦客戶端對⼀個節點註冊了⼦節點列表變更監聽之後,那麼當該節點的⼦節點列表發⽣變更時,服務 端都會通知客戶端,並將最新的⼦節點列表發送給客戶端  
  • 該節點本身的創建或刪除也會通知到客戶端。

監聽節點數據變化:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
//使⽤監聽器監聽節點數據的變化
public class Get_Data_Change {
 public static void main(String[] args) throws InterruptedException {
 // 獲取zkClient對象
 final ZkClient zkClient = new ZkClient("linux121:2181");
 //設置⾃定義的序列化類型,否則會報錯!!
 zkClient.setZkSerializer(new ZkStrSerializer());
 //判斷節點是否存在,不存在創建節點並賦值
 final boolean exists = zkClient.exists("/lg-client1");
 if (!exists) {
 zkClient.createEphemeral("/lg-client1", "123");
 }
 //註冊監聽器,節點數據改變的類型,接收通知後的處理邏輯定義
 zkClient.subscribeDataChanges("/lg-client1", new IZkDataListener() {
 public void handleDataChange(String path, Object data) throws
Exception {
 //定義接收通知之後的處理邏輯
 System.out.println(path + " data is changed ,new data " +
data);
 }
 //數據刪除--》節點刪除
 public void handleDataDeleted(String path) throws Exception {
 System.out.println(path + " is deleted!!");
 }
 });
 //更新節點的數據,刪除節點,驗證監聽器是否正常運⾏
 final Object o = zkClient.readData("/lg-client1");
 System.out.println(o);
 zkClient.writeData("/lg-client1", "new data");
 Thread.sleep(1000);
 //刪除節點
 zkClient.delete("/lg-client1");
 Thread.sleep(Integer.MAX_VALUE);
 }
}

zk 自定義字符串序列化:

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class ZkStrSerializer implements ZkSerializer {
 //序列化,數據--》byte[]
 public byte[] serialize(Object o) throws ZkMarshallingError {
  return String.valueOf(o).getBytes();
 }
 //反序列化,byte[]--->數據
 public Object deserialize(byte[] bytes) throws ZkMarshallingError {
 return new String(bytes);
 }
}

 

Leader選舉:

選舉機制:

  • 半數機制:集群中半數以上機器存活,集群可⽤。所以Zookeeper適合安裝奇數台服務器。
  • Zookeeper雖然在配置⽂件中並沒有指定Master和Slave。但是,Zookeeper⼯作時,是有⼀個節 點為Leader,其它為Follower,Leader是通過內部的選舉機制產⽣的。
  • 只有當機器減少或集群初次啟動才會選舉leader。

 

 

 詳細步驟:

  • (1)服務器1啟動,此時只有它⼀台服務器啟動了,它發出去的報⽂沒有任何響應,所以它的選舉狀態 ⼀直是LOOKING狀態。
  • (2)服務器2啟動,它與最開始啟動的服務器1進⾏通信,互相交換⾃⼰的選舉結果,由於兩者都沒有 歷史數據,所以id值較⼤的服務器2勝出,但是由於沒有達到超過半數以上的服務器都同意選舉它(這個 例⼦中的半數以上是3),所以服務器1、2還是繼續保持LOOKING狀態。 return String.valueOf(o).getBytes(); } //反序列化,byte[]—>數據 public Object deserialize(byte[] bytes) throws ZkMarshallingError { return new String(bytes); } } 123 /lg-client1 data is changed ,new data new data /lg-client1 is deleted!!
  • (3)服務器3啟動,根據前⾯的理論分析,服務器3成為服務器1、2、3中的⽼⼤,⽽與上⾯不同的 是,此時有三台服務器選舉了它,所以它成為了這次選舉的Leader。
  • (4)服務器4啟動,根據前⾯的分析,理論上服務器4應該是服務器1、2、3、4中最⼤的,但是由於前 ⾯已經有半數以上的服務器選舉了服務器3,所以它只能接收當⼩弟的命了。
  • (5)服務器5啟動,同4⼀樣稱為follower

集群首次啟動:

  半數前選myid 最大的機器。

非首次啟動:

  優先選擇zxid值⼤的節點稱為Leader!!

ZAB⼀致性協議:

  ZAB 協議是為分佈式協調服務 Zookeeper 專⻔設計的⼀種⽀持崩潰恢復和原⼦⼴播協議

原子廣播:

  

 

 

 

 具體流程:

 

 

 

 

 

 

 

總結: 第一步發送提議,如果提議獲得半數以上機器的ack,然後發送commit給follower,同時自己commit。

崩潰恢復:

Leader宕機後,被選舉的新Leader需要解決的問題:

  • ZAB 協議確保那些已經在 Leader 提交的事務最終會被所有服務器提交。
  • ZAB 協議確保丟棄那些只在 Leader 提出/複製,但沒有提交的事務。

選舉算法的關鍵點:保證選舉出的新Leader擁有集群中所有節點最⼤編號(ZXID)的事務!!

總結:leader崩潰,新的leader必須擁有最大的事務id,這樣才能保證數據最新。