Zookeeper學習(二)
一、Znode節點屬性
dataVersion 數據版本, 每次當 Znode 中的數據發生變化的時候, dataVersion
都會自增一下
cversion 節點版本, 每次當 Znode 的節點發生變化的時候, cversion 都會自增
aclVersion ACL(Access Control List) 的版本號, 當 Znode 的權限信息發生
變化的時候aclVersion會自增
zxid 事務ID
ctime 創建時間
mtime 最近一次更新的時間
ephemeralOwner 如果 Znode 為臨時節點, ephemeralOwner 表示與該節點關聯
的 SessionId
我們通過get 節點的目錄,可以得到節點的屬性
二、watch機制
對zookeeper里的某個節點設置個監聽,可以知道該節點是否進行了「增加」,「刪除」,「修改」
Watcher 的特點
一次性觸發 一個 Watcher 只會被觸發一次, 如果需要繼續監聽, 則需要再次添加
Watcher
事件封裝: Watcher 得到的事件是被封裝過的, 包括三個內容 keeperState,
eventType, path
設置監聽機制給/hello節點
另外再打開Hadoop101,修改/hello里的數據,看能否監聽到
證明:可以監聽到hello節點的數據改變了
三、zookeeper的JAVAAPI操作
這裡操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架 Curator ,解決了很多
Zookeeper客戶端非常底層的細節開發工作 。
Curator包含了幾個包:
curator-framework:對zookeeper的底層api的一些封裝
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式
計數器等
創建maven工程,導入jar包
創建一個測試類,開始進行zookeeper的javaapi編程
節點操作
①創建永久節點
@Test
/*
創建永久節點
*/
public void createZnode() throws Exception {
//1.定製一個重試策略
/*
parame1:重試間隔時間
parame2:重試的最大次數
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
//2.獲取一個客戶端對象
/*
parame1:要連接的zookeeper服務器列表
parame2:會話的超時時間
parame3:連接的超時時間
param4:重試策略
*/
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3.開啟客戶端
client.start();
//4.創建節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
//5.關閉客戶端
client.close();
}
運行後,進入linux查看,創建成功
②創建臨時節點
和創建永久節點的區別是:臨時節點是:EPHEMERAL,而且要讓他休眠幾秒否則Linux看不到,因為是臨時的,會話結束就會消失。
@Test
/*
創建臨時節點
*/
public void createTmpZnode() throws Exception {
//1.定製一個重試策略
/*
parame1:重試間隔時間
parame2:重試的最大次數
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
//2.獲取一個客戶端對象
/*
parame1:要連接的zookeeper服務器列表
parame2:會話的超時時間
parame3:連接的超時時間
param4:重試策略
*/
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3.開啟客戶端
client.start();
//4.創建節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello4","world".getBytes());
Thread.sleep(5000);
//5.關閉客戶端
client.close();
}
這樣就看到了,不過等會話結束就沒了
③修改節點數據
使用的是:client.setData().forPath(節點路徑,要更新的數據(記得轉換成byte))
/*
修改節點數據
*/
@Test
public void setZnodeData() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
client.setData().forPath("/hello","zookeeper".getBytes());
client.close();
}
④獲取節點數據
用byte數組來獲取client里的數據
然後轉換成String輸出
/*
獲取節點數據
*/
@Test
public void getZnodeData() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
byte[] bytes= client.getData().forPath("/hello");
System.out.println(new String(bytes));
client.close();
}
在打印處輸出:
⑤設置節點的watch機制
通過監聽hello3節點,對其進行創建,修改,刪除
/*
節點的watch機制
*/
@Test
public void watchZnode() throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
client.start();
TreeCache treeCache=new TreeCache(client,"/hello3");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data= treeCacheEvent.getData();
if(data!=null){
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("監控到有新增節點!");
break;
case NODE_REMOVED:
System.out.println("監控到有節點被移除!");
break;
case NODE_UPDATED:
System.out.println("監控到有節點被更新!");
break;
default:
break;
}
}
}
});
treeCache.start();
Thread.sleep(100000);
}
打印台輸出: