zookeeper從小白到精通
- 2022 年 3 月 27 日
- 筆記
1.介紹
1.1概念
zookeeper作用:用於維護配置資訊、命名、提供分散式同步和提供組服務
zookeeper主要是文件系統和通知機制
文件系統主要是用來存儲數據
通知機制主要是伺服器或者客戶端進行通知,並且監督
基於觀察者模式設計的分散式服務管理框架,開源的分散式框架
1.2特點
1):一個leader,多個follower的集群
2):集群只要有半數以上包括半數就可正常服務,一般安裝奇數台伺服器
3):全局數據一致,每個伺服器都保存同樣的數據,實時更新
4):更新的請求順序保持順序(來自同一個伺服器)
5):數據更新的原子性,數據要麼成功要麼失敗(大事務)
6):數據實時更新性很快(因為數據量很小)
1.3主要的集群步驟
1):服務端啟動時去註冊資訊(創建都是臨時節點)
2):獲取到當前在線伺服器列表,並且註冊監聽
3):伺服器節點下線
4):伺服器節點上下線事件通知
5):process(){重新再去獲取伺服器列表,並註冊監聽}
1.4數據結構
與 Unix 文件系統很類似,可看成樹形結構,每個節點稱做一個ZNode。每一個ZNode默認能夠存儲 1MB 的數據。也就是只能存儲小數據(一般配置資訊,註冊資訊等)
1.5應用場景
1):統一命名服務(域名服務)
在分散式環境下,經常需要對應用/服務進行統一命名,便於識別,eg:ip不容易記住,而域名容易記住
2):統一配置管理(一個集群中的所有配置都一致,且也要實時更新同步)
3):將配置資訊寫入ZooKeeper上的一個Znode,各個客戶端伺服器監聽這個Znode。一旦Znode中的數據被修改,ZooKeeper將通知各個客戶端伺服器
4):統一集群管理(掌握實時狀態)
5):將節點資訊寫入ZooKeeper上的一個ZNode。監聽ZNode獲取實時狀態變化
6):伺服器節點動態上下線
7):軟負載均衡(根據每個節點的訪問數,讓訪問數最少的伺服器處理最新的數據需求)
2.本地安裝
2.1安裝jdk
1.查看是否已安裝JDK
yum list installed |grep java
2.卸載CentOS系統Java環境
# yum -y remove java-1.8.0-openjdk* *表示卸載所有openjdk相關文件輸入
# yum -y remove tzdata-java.noarch 卸載tzdata-java
3.查看JDK軟體包版本
# yum -y list java* 或者使用# yum searchjava | grep -i --color JDK
4.安裝
yum install java-1.8.0-openjdk* //安裝java1.8.0所有程式
java -version //查看java版本資訊
注意:使用yum安裝環境變數自動就配好了
2.2下載安裝
官網://zookeeper.apache.org/
下載3.5.7穩定版本
下載://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
wget //archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
解壓:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg
bin目錄 框架啟動停止,客戶端和服務端的
conf 配置文件資訊
docs文檔
lib 配置文檔的依賴
2.3配置文件修改
配置文件:/opt/module/zookeeper-3.5.7/conf/zoo.cfg
修改:
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路徑
2.4啟動服務端
cd /opt/module/zookeeper-3.5.7/bin
[root@sg-15 bin]# ./zkServer.sh start // 啟動服務端
//查看進程
[root@sg-15 bin]# jps -l
21172 sun.tools.jps.Jps
21110 org.apache.zookeeper.server.quorum.QuorumPeerMain
2.5啟動客戶端
[root@sg-15 bin]# ./zkCli.sh // 啟動客戶端
[zk: localhost:2181(CONNECTED) 4] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 5] quit //退出客戶端
[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper狀態
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone
2.6zookeeper常用命令
[root@sg-15 bin]# ./zkServer.sh start // 啟動服務端
[root@sg-15 bin]# ./zkServer.sh stop // 停止服務端
[root@sg-15 bin]# jps -l // 查看進程
[root@sg-15 bin]# ./zkCli.sh // 啟動客戶端
[zk: localhost:2181(CONNECTED) 5] quit // 退出客戶端
[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper狀態
2.7配置文件解讀
配置文件的5大參數:
tickTime = 2000 //通訊心跳時間,Zookeeper伺服器與客戶端心跳時間,單位毫秒
initLimit = 10 //Leader和Follower初始連接時能容忍的最多心跳數(tickTime的數量)
syncLimit = 5 //Leader和Follower之間通訊時間如果超過5個心跳時間,Leader認為Follwer死掉,從伺服器列表中刪除Follwer。
dataDir =/tmp/zookeeper //保存zookeeper的數據,這是默認值,會定時被系統清除
dataDir保存zookeeper的數據,默認是temp會被系統定期清除,通常改為自己的路徑
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路徑
clientPort = 2181 //客戶端的連接埠,一般不需要修改
3.集群安裝
3.1集群規劃
sg15,sg16,sg17三台機器部署Zookeeper
3.2安裝
解壓:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg
3.3配置
[root@sg-15 zookeeper-3.5.7]# mkdir zkData // 創建目錄zkData
[root@sg-15 zkData]# vi myid // 創建一個 myid 的文件
在文件中添加與 server 對應的編號(注意:上下不要有空行,左右不要有空格)
5 // 192.168.0.215,這裡設置伺服器尾號,其他的不重複就行
[root@sg-15 conf]# mv zoo_sample.cfg zoo.cfg // 配置文件改名
[root@sg-15 conf]# vi zoo.cfg //修改配置文件
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路徑
// 添加配置
#######################cluster##########################
server.5=192.168.0.215:2888:3888
server.6=192.168.0.216:2888:3888
server.7=192.168.0.217:2888:3888
##配置參數解讀
server.A=B:C:D
A 是一個數字,表示這個是第幾號伺服器,就是myid中的值
B 是這個伺服器的地址
C 是這個伺服器Follower 與集群中的 Leader 伺服器交換資訊的埠;
D 是萬一集群中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的
Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。
注意:一台機器弄好之後,打包發送到其他機器。其他機器修改myid值
tar -cf module.tar ./module // 打包
scp -r module.tar [email protected]:/opt/ //發送
tar -xf module.tar // 解包
3.4啟動zookeeper集群
注意:集群只要有半數以上包括半數就可正常服務。
三台機器啟動:
[root@sg-15 bin]# ./zkServer.sh start //啟動
[root@sg-15 bin]# ./zkServer.sh restart //重啟
[root@sg-17 bin]# ./zkServer.sh status //檢查狀態
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader //本機器為leader
[root@sg-15 bin]# ./zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower //本機器為follower
4.選舉機制
選舉誰當leader
介紹:
SID:伺服器ID。用來唯一標識一台ZooKeeper集群中的機器,每台機器不能重複,和myid一致。
ZXID:事務ID。ZXID是一個事務ID,用來標識一次伺服器狀態的變更。在某一時刻, 集群中的每台機器的ZXID值不一定完全一樣
Epoch:每個Leader任期的代號。沒有Leader時同一輪投票過程中的邏輯時鐘值是
4.1觸發選舉時機
1.伺服器剛啟動
2.伺服器運行期間無法和leader保持連接
當一台機器進入leader選舉流程時,當前集群出現兩種狀態:
1.集群中已經存在一個leader(此機器和leader建立連接,並狀態同步)
2.集群中不存在leader(觸發選舉),looking狀態
4.2zookeeper選舉機制—第一次啟動
伺服器1:myid=1
伺服器2:myid=2
伺服器3:myid=3
(1) 伺服器1啟動,發起一次選舉。伺服器1投自己一票。此時伺服器1票數一票,不夠半數以上(3票),選舉無法完成,伺服器1狀態保持為LOOKING;
(2) 伺服器2啟動,再發起一次選舉。伺服器1和2分別投自己一票並交換選票資訊:伺服器1和伺服器2比較誰的myid大,更改選票為推舉伺服器myid大的。此時伺服器1票數0票,伺服器2票數2票,大於半數以上結果,選舉完成。伺服器1為follower,2狀態為leader
(3) 伺服器3啟動,發起一次選舉。此時伺服器1,2已經不是LOOKING狀態,不會更改選票資訊。交換選票資訊結果:伺服器3為1票,此時伺服器3服從多數,更改選票資訊為伺服器2,並更改狀態為FOLLOWING;
4.2zookeeper選舉機制—非第一次啟動
伺服器1:myid=1
伺服器2:myid=2
伺服器3:myid=3
伺服器運行期間無法和leader保持連接觸發重新選舉:
假設zookeeper由5台伺服器組成,SID分別為1,2,3,4,5,ZXID分別為8,8,8,7,7,並且此時SID為3的伺服器此時時leader。某時刻3和5出現故障,因此開始進行leader選舉:
SID為1的機器:(1,8,1)
SID為2的機器:(1,8,2)
SID為4的機器:(1,7,4)
選舉規則:
優先比較SID,再比較ZXID,其次比較Epoch。大的直接勝出當選leader
4.3選舉機制總結
半數機制,超過半數的投票通過,即通過。
第一次啟動選舉規則:投票過半數時,伺服器 myid 大的勝出當leader
第二次啟動選舉規則:①EPOCH大的直接勝出 ②EPOCH相同,事務id大的勝出 ③事務id相同,任期代號id大的勝出
5.客戶端命令行操作
5.1常用命令整合
登陸客戶端操作:很多命令和linux命令相似,比如ls,history等
jps : 查看zookeeper運行的進程
help :顯示所有操作命令
ls / :查看當前znode中包含的內容
ls -s / :查看當前節點詳細數據
create : 創建普通節點
create -s :創建帶序號節點
create -e :創建臨時普通節點
create -e -s :創建臨時有序節點
delete:刪除節點
create /wangzhe "this is wangzhe" //新建節點(永久節點,不帶序號)
create /wangzhe/fashi "this is fashi" // //新建節點(永久節點,不帶序號)
get /wangzhe // this is wangzhe 取值
get -s /wangzhe //節點詳情
set set /wangzhe "this is wangzherongyao" //修改值
get -w :監聽值
ls -w /wangzhe :監聽數量
監聽註冊一個生效一次
5.2啟動客戶端
[root@sg-15 bin]# ./zkCli.sh -server 192.168.0.215:2181
quit //退出
5.3znode節點數據資訊
命令:ls -s /
[zk: 192.168.0.215:2181(CONNECTED) 3] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
##################################
1):czxid:創建節點的事務id zxid
每次修改ZooKeeper 狀態都會產生一個ZooKeeper 事務 ID。事務 ID 是ZooKeeper 中所有修改總的次序。每次修改都有唯一的 zxid,如果 zxid1 小於 zxid2,那麼zxid1 在 zxid2 之前發生。
2):ctime:znode 被創建的毫秒數(從 1970 年開始)
3):mzxid:znode 最後更新的事務zxid
4):mtime:znode 最後更新的毫秒數(從 1970 年開始)
5):pZxid:znode 最後更新的子節點zxid
6):cversion:znode 子節點變化號,znode 子節點修改次數
7):dataversion:znode 數據變化號
8):aclVersion:znode 訪問控制列表的變化號
9):ephemeralOwner:如果是臨時節點,這個是 znode 擁有者的 session id。如果不是臨時節點則是 0。
10):dataLength:znode 的數據長度
11):numChildren:znode 子節點數量
5.4節點類型(持久/短暫/有序號/無序號)
持久:客戶端和服務端端開連接後,創建的節點不刪除
序號:在分散式系統中,順序號可以被用於所有事件排序,這樣客戶端可以通過順序號推斷事件的順序
{
持久有序號:客戶端和服務端端開連接後,創建的節點不刪除。且節點名稱順序編號
持久無序號:客戶端和服務端端開連接後,創建的節點不刪除。無序號
}
短暫:客戶端和服務端端開連接後,創建的節點自己刪除
{
短暫有序號:客戶端和服務端端開連接後,創建的節點自己刪除。且節點名稱順序編號
短暫無序號:客戶端和服務端端開連接後,創建的節點自己刪除。無序號
}
5.5節點操作
5.5.1創建/刪除節點
create : 創建普通節點
create -s :創建帶序號節點
create -e :創建臨時普通節點
create -e -s :創建臨時有序節點
delete:刪除一個節點(如果這個節點下有子節點,刪除失敗)
deleteall:遞歸刪除所有節點(如果這個節點下有子節點,全部刪除)
eg:delete /wangzhe/fashi //只刪除fashi節點
deleteall /wangzhe //遞歸刪除wangzhe所有節點
//1.創建節點:普通永久節點
[zk: 192.168.0.215:2181(CONNECTED) 27] create /wangzhe "this is wangzhe"
Created /wangzhe
[zk: 192.168.0.215:2181(CONNECTED) 40] create /wangzhe/fashi "this is fashi"
Created /wangzhe/fashi
//2.創建節點:有序永久節點
[zk: 192.168.0.215:2181(CONNECTED) 66] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000000
[zk: 192.168.0.215:2181(CONNECTED) 70] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000001 //再次創建daji,序號+1
//3.創建節點:普通臨時節點
[zk: 192.168.0.215:2181(CONNECTED) 71] create -e /wangzhe/fashi/ganjiang "ganjiang"
Created /wangzhe/fashi/ganjiang
//4.創建節點:有序臨時節點
[zk: 192.168.0.215:2181(CONNECTED) 73] create -e -s /wangzhe/fashi/anqila "anqila"
Created /wangzhe/fashi/anqila0000000003
//查看
[zk: 192.168.0.215:2181(CONNECTED) 74] ls /wangzhe/fashi
[anqila0000000003, daji0000000000, daji0000000001, ganjiang]
//delete刪除
[zk: 192.168.0.215:2181(CONNECTED) 19] delete /wangzhe/fashi/xiaoqiao
xiaoqiao xiaoqiao0000000007
5.5.2獲取/查看的值
注意:get獲取有序節點的值時:必須獲取最後一個序號的值,比如獲取下面ganjiang0000000004報錯,獲取ganjiang0000000005正常。
ls
get
get -s
//查看
[zk: 192.168.0.215:2181(CONNECTED) 10] ls /wangzhe/fashi
[daji0000000000, daji0000000001, ganjiang0000000004, ganjiang0000000005, xiaoqiao]
//獲取值
[zk: 192.168.0.215:2181(CONNECTED) 12] get /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao
// 獲取詳細
[zk: 192.168.0.215:2181(CONNECTED) 13] get -s /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao
cZxid = 0x30000002f
ctime = Sat Mar 26 18:37:35 CST 2022
mZxid = 0x30000002f
mtime = Sat Mar 26 18:37:35 CST 2022
pZxid = 0x30000002f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: 192.168.0.215:2181(CONNECTED) 14] get -s /wangzhe/fashi
this is fashi
cZxid = 0x300000020
ctime = Sat Mar 26 17:31:12 CST 2022
mZxid = 0x300000020
mtime = Sat Mar 26 17:31:12 CST 2022
pZxid = 0x30000002f
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 6
5.6節點監聽
get -w :監聽值
ls -w /wangzhe :監聽數量
監聽註冊一個生效一次
5.6.1監聽節點值改變
[zk: 192.168.0.215:2181(CONNECTED) 25] get -w /wangzhe //開啟一次監聽
this is wangzhe
[zk: 192.168.0.215:2181(CONNECTED) 26] set /wangzhe "jianting:this is wangzhe" //修改值
WATCHER:: //監聽值發生變化
WatchedEvent state:SyncConnected type:NodeDataChanged path:/wangzhe
5.6.2監聽節點下子節點數量改變
[zk: 192.168.0.215:2181(CONNECTED) 27] ls -w /wangzhe ////開啟一次監聽
[fashi, fashi0000000001]
[zk: 192.168.0.215:2181(CONNECTED) 28] delete /wangzhe/fashi0000000001 //刪除一個節點
WATCHER:: //監聽數量發生變化
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/wangzhe
6.goang操作zookeeper
6.1創建節點create(增)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
// 任意一個ip都可以,但是建議放主節點ip
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
// 1.創建的普通永久節點
path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
}
fmt.Println("創建的普通永久節點:", path)
// 2.創建的普通臨時節點,創建此節點的會話結束後立即清除此節點
ephemeral, err := conn.Create("/ephemeral", []byte("world"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
}
fmt.Println("創建的普通臨時節點:", ephemeral)
// 3.創建的有序永久節點
sequence, err := conn.Create("/sequence", []byte("world"), zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
fmt.Println("創建的有序永久節點:", sequence)
// 4.創建的有序臨時節點,創建此節點的會話結束後立即清除此節點
ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("world"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
fmt.Println("創建的有序臨時節點:", ephemeralSequence)
}
6.2查看節點get(查)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
result, state, err := conn.Get("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("result: ", string(result)) // this is fashi
fmt.Printf("%#v",state)
//狀態結果:&zk.Stat{Czxid:12884901920, Mzxid:12884901920, Ctime:1648287072539, Mtime:1648287072539, Version:0, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:13, NumChildren:5, Pzxid:12884901936}
}
6.3修改節點set(改)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/wangzhe/fashi"
_, state, _ := conn.Get(path) // 先查詢,拿到當前版本
state, err = conn.Set(path, []byte("hello"), state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Printf("%#v",state)
//結果:&zk.Stat{Czxid:12884901920, Mzxid:12884902012, Ctime:1648287072539, Mtime:1648305221598, Version:1, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:5, NumChildren:5, Pzxid:12884901936}2022/03/26 22:33:39 recv loop
}
6.4刪除節點delete(刪)
注意:此方法不能遞歸刪除節點
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/hello"
exists, state, err := conn.Exists(path) //判斷是否存在和查詢版本
if exists{
err = conn.Delete(path, state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("節點刪除成功!!!")
}else {
fmt.Println("節點不存在!!!")
}
}
6.5查看節點的子節點Children
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
childrenList, state, err := conn.Children("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println(childrenList)
fmt.Printf("%#v",state)
}
6.6遍歷節點Children後再get
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
childrenList, _, err := conn.Children("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
for _,children:=range childrenList{
childrenPath:= fmt.Sprintf("/wangzhe/fashi/%s",children)
result, state, err := conn.Get(childrenPath)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println(string(result))
fmt.Println(state)
}
}
6.7判斷節點是否存在-conn.Exists
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/hello"
exists, _, err := conn.Exists(path)
if err != nil {
fmt.Println("err:",err)
return
}
if exists{
fmt.Println("節點存在")
}else{
fmt.Println("節點不存在")
}
}
7.監聽/watch
7.1監聽節點-全局監聽
將監聽器放到Connect
函數中,如果有監聽事件發生,會一直執行監聽器的回調函數。監聽執行了一次之後要重新註冊監聽。
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func callback(e zk.Event) {
fmt.Println("========================")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
}
func main() {
eventCallbackOption := zk.WithEventCallback(callback)
// 經過測試,在連接時會執行3次回調函數
conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second,eventCallbackOption)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
// 註冊一個 watch
exists, state, _, err := conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("exists:",exists)
if !exists {
// 創建 /watch 時,觸發監聽事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
// 再註冊一個 watch
_, state, _, err = conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
}
// 刪除 /watch 時,觸發監聽事件,watch 失效
err = conn.Delete("/watch", state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
}
結果:
========================
path:
type: EventSession
state: StateConnecting
========================
path:
type: EventSession
state: StateConnected
2022/03/27 11:09:27 connected to 192.168.0.215:2181
========================
path:
type: EventSession
state: StateHasSession
2022/03/27 11:09:27 authenticated: id=360292329056632906, timeout=4000
2022/03/27 11:09:27 re-submitting `0` credentials after reconnect
exists: false
========================
path: /watch
type: EventNodeCreated
state: Unknown
========================
path: /watch
type: EventNodeDeleted
state: Unknown
7.2監聽部分事件
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
// 註冊一個 watch
exists, _, eventChannel, err := conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
go func() {
// 從事件 channel 中取出事件
e := <-eventChannel
fmt.Println("========================")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
}()
if !exists {
// 創建 臨時節點/watch 時,觸發監聽事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
}
}
8.微服務動態上下線監聽(服務註冊/發現)
8.1需求實現
微服務分散式系統中,主節點可以有多台,可以動態上下線,任意一台客戶端都能實時感知到主節點伺服器的上下線。
需求實現:
服務端:服務端啟動時,在zookeeper中創建臨時有序節點,服務關閉時,臨時節點自動刪除了(zookeeper臨時節點機制)
客戶端:監聽節點的變化
8.2服務端創建程式碼-(註冊服務)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
//創建的有序臨時節點,創建此節點的會話結束後立即清除此節點 create -e -s
ephemeralSequence, err := conn.Create("/servers/bikesvc", []byte("bikesvc"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("創建的有序臨時節點:", ephemeralSequence)
time.Sleep(time.Second*10)
}
8.3客戶端監聽程式碼-(服務發現)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:", err)
return
}
defer conn.Close()
snapshots, errors := mirror(conn, "/servers") //監控的根節點,根節點不能刪除
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Println("監控變化:", snapshot)
case err := <-errors:
fmt.Println("err:", err)
}
}
}()
for {
}
}
結果:
服務端:
創建的有序臨時節點: /servers/bikesvc0000000010
客戶端:
監控變化: []
監控變化: [bikesvc0000000009]
監控變化: []
9.分散式鎖
加鎖進行資源保護
go-zookeeper 添加分散式鎖的方法為NewLock(c *Conn, path string, acl []ACL)。
鎖的結構體為:
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
這個結構體實現了三個方法:Lock(),LockWithData(data []byte)和Unlock()
9.1分散式鎖案例
根節點「/root」判斷是否存在,不存在則創建
package main
import (
"fmt"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加鎖
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {
panic(err)
}
fmt.Println("第", n, "個 goroutine 獲取到了鎖")
time.Sleep(time.Second*1) // 1 秒後釋放鎖
lock.Unlock() //解鎖
}(i)
}
wg.Wait()
}
這裡給了兩個進程搶鎖,ls查看一下鎖:
解釋:把所有進程按有序排列,當成節點放入lock節點中,按照最小的序號執行。解鎖一個刪除一個。直到節點為空,進程執行完畢。
[zk: localhost:2181(CONNECTED) 32] ls /root/lock
[_c_1dbbc1ec75b285ef10a6d6154627335c-lock-0000000153, _c_793a837ded040d01608395e5eac65979-lock-0000000152]
先執行152,再執行153
9.2監控鎖案例
監控鎖節點變化
監控程式碼
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:", err)
return
}
defer conn.Close()
snapshots, errors := mirror(conn, "/root/lock") //監控的根節點,根節點不能刪除
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Println("監控變化:", snapshot)
case err := <-errors:
fmt.Println("err:", err)
}
}
}()
for {
}
}
分散式鎖程式碼
package main
import (
"fmt"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加鎖
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {
panic(err)
}
fmt.Println("第", n, "個 goroutine 獲取到了鎖")
time.Sleep(time.Second*1) // 1 秒後釋放鎖
lock.Unlock() //解鎖
}(i)
}
wg.Wait()
}
結果: