Zookeeper學習(二)

 一、Znode節點屬性

dataVersion 數據版本, 每次當 Znode 中的數據發生變化的時候, dataVersion
都會自增一下
cversion 節點版本, 每次當 Znode 的節點發生變化的時候, cversion 都會自增
aclVersion ACL(Access Control List) 的版本號, 當 Znode 的權限信息發生
變化的時候aclVersion會自增
zxid 事務ID
ctime 創建時間
mtime 最近一次更新的時間
ephemeralOwner 如果 Znode 為臨時節點, ephemeralOwner 表示與該節點關聯
的 SessionId

我們通過get 節點的目錄,可以得到節點的屬性

 

 

二、watch機制

對zookeeper里的某個節點設置個監聽,可以知道該節點是否進行了「增加」,「刪除」,「修改」

Watcher 的特點
一次性觸發 一個 Watcher 只會被觸發一次, 如果需要繼續監聽, 則需要再次添加
Watcher
事件封裝: Watcher 得到的事件是被封裝過的, 包括三個內容 keeperState,
eventType, path

 

設置監聽機制給/hello節點

 

另外再打開Hadoop101,修改/hello里的數據,看能否監聽到

 

 

 

 證明:可以監聽到hello節點的數據改變了

三、zookeeper的JAVAAPI操作

這裡操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架 Curator ,解決了很多
Zookeeper客戶端非常底層的細節開發工作 。
Curator包含了幾個包:
curator-framework:對zookeeper的底層api的一些封裝
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式
計數器等

創建maven工程,導入jar包

 

 

創建一個測試類,開始進行zookeeper的javaapi編程

節點操作

①創建永久節點

 @Test
    /*
    創建永久節點
     */
    public void createZnode() throws Exception {
        //1.定製一個重試策略
        /*
        parame1:重試間隔時間
        parame2:重試的最大次數
         */
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);

        //2.獲取一個客戶端對象
        /*
        parame1:要連接的zookeeper服務器列表
        parame2:會話的超時時間
        parame3:連接的超時時間
        param4:重試策略
         */
        String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
        CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
        //3.開啟客戶端
        client.start();
        //4.創建節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
        //5.關閉客戶端
        client.close();
    }

運行後,進入linux查看,創建成功

 

 

②創建臨時節點

和創建永久節點的區別是:臨時節點是:EPHEMERAL,而且要讓他休眠幾秒否則Linux看不到,因為是臨時的,會話結束就會消失。

 @Test
    /*
    創建臨時節點
     */
    public void createTmpZnode() throws Exception {
        //1.定製一個重試策略
        /*
        parame1:重試間隔時間
        parame2:重試的最大次數
         */
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);

        //2.獲取一個客戶端對象
        /*
        parame1:要連接的zookeeper服務器列表
        parame2:會話的超時時間
        parame3:連接的超時時間
        param4:重試策略
         */
        String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
        CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
        //3.開啟客戶端
        client.start();
        //4.創建節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello4","world".getBytes());
        Thread.sleep(5000);
        //5.關閉客戶端
        client.close();
    }

 

 這樣就看到了,不過等會話結束就沒了

 

③修改節點數據

使用的是:client.setData().forPath(節點路徑,要更新的數據(記得轉換成byte))

/*
    修改節點數據
     */
    @Test
    public void setZnodeData() throws Exception {
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);

        String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
        CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
        client.start();
        client.setData().forPath("/hello","zookeeper".getBytes());
        client.close();
    }

 

 

④獲取節點數據

用byte數組來獲取client里的數據

然後轉換成String輸出

/*
    獲取節點數據
     */
    @Test
    public void getZnodeData() throws Exception {
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);

        String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
        CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
        client.start();
        byte[] bytes= client.getData().forPath("/hello");
        System.out.println(new String(bytes));
        client.close();
    }

在打印處輸出:

 

 

⑤設置節點的watch機制

通過監聽hello3節點,對其進行創建,修改,刪除

/*
    節點的watch機制
     */
    @Test
    public void watchZnode() throws Exception {
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,1);
        String connectionStr="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181";
        CuratorFramework client=CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
        client.start();
        TreeCache treeCache=new TreeCache(client,"/hello3");
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                ChildData data= treeCacheEvent.getData();
                if(data!=null){
                    switch (treeCacheEvent.getType()){
                        case NODE_ADDED:
                            System.out.println("監控到有新增節點!");
                            break;
                        case NODE_REMOVED:
                            System.out.println("監控到有節點被移除!");
                            break;
                        case NODE_UPDATED:
                            System.out.println("監控到有節點被更新!");
                            break;
                        default:
                            break;
                    }
                }
            }
        });
        treeCache.start();

        Thread.sleep(100000);

    }

 

 

打印台輸出:

 

 

 

Tags: