Zookeeper基礎入門

Zookeeper簡介

基本概念

Zookeeper是一個開源的分散式協調服務。其設計目標是將那些複雜的容易出錯的分散式一致性服務封裝起來,以簡單的介面提供給用戶使用。它是一個典型的分散式數據一致性的解決方案,分散式應用程式可以基於它實現如:發布/訂閱、負載均衡、集群管理、分散式鎖、分散式隊列等功能。

名詞概念

  1. 集群角色

Zookeeper集群中有三種角色:Leader、Follower、Observer。Leader提供讀和寫服務。Follower和Observer能提供讀服務。Observer和Follower的區別就在於Ovserver不參與Leader選舉,不參與寫操作的過半成功策略。

因此Observer可以在不影響寫性能的情況下,提升集群的性能。如果沒有Observer的話,一個Leader和N個Follower,如果Follower特別多的話,雖然讀性能提高了,但是寫性能和選舉的性能會受影響。

  1. 會話(session)

客戶端會話,一個客戶端連接是指客戶端和服務端之間的一個TCP長連接。客戶端啟動的時候,首先會和伺服器建立一個TCP長連接,從第一次建立連接開始,會話(session)的生命周期就開始了。通過這個連接,客戶端能夠通過心跳檢測與伺服器保持有效的會話,也能夠向Zookeeper伺服器發送請求,同時還能接受伺服器的Watch事件通知。

  1. 數據節點(Znode)

Zookeeper數據模型中的一個單元,我們稱之為數據節點。Zookeeper將所有數據存儲在記憶體中,數據模型是一棵樹,由斜杠進行分割的路徑,就是一個Znode。如/app。每個Znode都能保存自己的數據內容,還會保存屬性資訊。
4. 版本

每個Znode都有一個叫作Stat的數據結構,Stat里記錄了Znode的三個數據版本,分別是version(當前Znode的版本)、cversion(當前Znode子節點的版本)、aversion(當前Znode的ACL版本)
5. Watcher(事件監聽器)

Watcher是Zookeeper中一個很重要的特性,Zookeeper允許用戶在指定節點上註冊一些Watcher,並且在一些特定事件觸發的時候,Zookeeper服務端會將事件通知到感興趣的客戶端,該機制是Zookeeper實現分散式協調服務的重要特性。
6. ACL

Zookeeper採用ACL策略來進行許可權控制,它定義了五種許可權:

  • CREATE:創建子節點的許可權
  • READ:獲取節點數據和子節點列表的許可權
  • WRITE:更新節點數據的許可權
  • DELETE:刪除子節點的許可權
  • ADMIN:設置子節點ACL的許可權。

注意:CREATE和DELETE這兩種許可權是針對子節點的許可權控制

Zookeeper命令行

  1. 客戶端連接
 ./zkCli.sh   #連接本地的zookeeper伺服器
 ./zkCli.sh -server ip:port   # 連接遠程的伺服器

連接成功之後,系統會輸出Zookeeper的相關環境及配置資訊等資訊

  1. 創建節點
 create [-s] [-e] [-c] [-t ttl] path [data] [acl]
 
 #創建順序節點
 create -s /cc
 
 #創建臨時節點,客戶端會話結束後,節點會自動刪除
 create -e /temp
 
 #創建帶內容的節點
 create /hi nihao
  1. 讀取節點
#ls命令會列出path節點下所有的子節點
ls path

#get命令會查詢到path節點的數據內容,加上-s可以查詢更詳細的資訊
get [-s] path
  1. 更新節點
set [-s] [-v version] path data

#修改/abc節點的內容為hello,加上-s會返回更詳細的資訊
set /abc hello
  1. 刪除節點
#刪除,如果帶上了版本參數,那麼刪除的時候就會校驗版本是否正確,正確才進行刪除
delete [-v version] path

Zookeeper 客戶端API

官方原生API

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>

很少直接使用了,介面介紹省略

ZkClient API

ZkClient是github上一個開源的zookeeper客戶端,在原生API的基礎上進行了包裝,更加易用。同時還實現了如Session超時重連、Watcher反覆註冊等功能。

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.2</version>
</dependency>

直接上程式碼:


public class ClientDemo {

    private static final String connectStr="192.168.56.115:2181";

    public static void main(String[] args) throws InterruptedException {
//        create();
//        delete();
//        get();
        getData();

    }

    public static void create() throws InterruptedException {
        ZkClient zkClient=new ZkClient(connectStr);
        //第二個參數,true代表遞歸創建節點,沒有父節點先創建父節點
        zkClient.createPersistent("/test/node01",true);
        //持久有序的node
        zkClient.createPersistentSequential("/test/node02","data");
        //臨時node
        zkClient.createEphemeral("/ephemeral");
    }

    public static void delete(){
        ZkClient zkClient=new ZkClient(connectStr);
        //普通刪除
        zkClient.delete("/test");
        //遍歷刪除,先刪除該節點的所有子節點,再刪除它本身
        zkClient.deleteRecursive("/test");
    }


    public static void get() throws InterruptedException {
        ZkClient zkClient=new ZkClient(connectStr);
        List<String> children = zkClient.getChildren("/test");
        System.out.println("子節點:"+children);

        zkClient.subscribeChildChanges("/watch", new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                System.out.println(s + " child changed,list:" + list);
            }
        });

        zkClient.createPersistent("/watch");
        Thread.sleep(1000);
        zkClient.createPersistent("/watch/test");
        Thread.sleep(1000);
        zkClient.delete("/watch/test");
        Thread.sleep(1000);
        zkClient.delete("/watch");

    }

    //獲取數據
    public static void getData() throws InterruptedException {
        ZkClient zkClient=new ZkClient(connectStr);
        String path="/abc";
        boolean exists = zkClient.exists(path);
        System.out.println("節點是否存在:"+exists);
        if(!exists){
            zkClient.createEphemeral(path,"123");
        }

        //數據改變監聽
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("節點:"+s+" 數據被改變:"+o);
            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("節點被刪除:"+s);
            }
        });
        //讀取數據
        Object o = zkClient.readData(path);
        System.out.println("讀取節點的數據:"+o);
        Thread.sleep(3000);

        //更新數據
        Stat stat = zkClient.writeData(path, "456");
        Thread.sleep(3000);

        //刪除數據
        zkClient.delete(path);
        Thread.sleep(3000);
    }
}

Curator API

Curator是Netflix公司開源的客戶端框架。它實現了連接重連、Watcher反覆註冊、重試策略和NodeExistsException異常解決等等。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>

直接上程式碼:


public class CuratorDemo {

    private static final String connectStr="192.168.56.115:2181";
    private static String path ="/abc";


    public static void main(String[] args) throws Exception {
//        connect();
//        create();
//        delete();
//        get();
        update();
    }

    public static void connect(){
        //連接方式1
        /*
        構造器含有三個參數 ExponentialBackoffRetry(int
        baseSleepTimeMs, int maxRetries, int maxSleepMs)
        baseSleepTimeMs:初始的sleep時間,⽤於計算之後的每次重試的sleep時間,
        計算公式:當前sleep時間=baseSleepTimeMs*Math.max(1,
        random.nextInt(1<<(retryCount+1)))
        maxRetries:最⼤重試次數
        maxSleepMs:最⼤sleep時間,如果上述的當前sleep計算出來⽐這個⼤,那麼sleep⽤
        這個時間,默認的最⼤時間是Integer.MAX_VALUE毫秒。
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 3000, retryPolicy);
        client.start();

        //連接方式2
        CuratorFramework Client = CuratorFrameworkFactory.builder()
                .connectString("server1:2181,server2:2181,server3:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
    }


    public static void create() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy);
        client.start();

        Thread.sleep(2000);
        //創建一個內容為空的節點,curator默認是創建持久節點
//        client.create().forPath(path);
        //創建一個有內容的節點
        client.create().forPath(path,"123".getBytes());
        //調用creatingParentsIfNeeded 介面,Curator 就能夠自動地遞歸創建所有需要的父節點
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/tt");
    }

    public static void delete() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy);
        client.start();

        //刪除節點
        client.delete().forPath(path);
        //刪除節點,並遞歸刪除子節點
        client.delete().deletingChildrenIfNeeded().forPath(path);
        //指定版本刪除ls
        client.delete().withVersion(1).forPath(path);
        //強制刪除。只要客戶端會話有效,那麼Curator會在後台持續進⾏刪除操作,直到節點刪除成功。⽐如遇到⼀些⽹
        //絡異常的情況,此guaranteed的強制刪除就會很有效果
        client.delete().guaranteed().forPath(path);
    }


    public static void get() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy);
        client.start();

        //普通查詢
        byte[] bytes = client.getData().forPath(path);
        System.out.println("節點內容:"+new String(bytes));
        // 包含狀態查詢
        Stat stat = new Stat();
        byte[] bytes1 = client.getData().storingStatIn(stat).forPath(path);
        System.out.println(stat.getVersion());
    }

    public static void update() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy);
        client.start();

        // 普通更新
        client.setData().forPath(path,"新內容".getBytes());
        // 指定版本更新 當攜帶數據版本不⼀致時,無法完成更新操作
        // 異常資訊:Exception in thread "main" org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /abc
        client.setData().withVersion(1).forPath(path,"abcd".getBytes());
    }