zookeeper實現分佈式鎖

在學習zk實現分佈鎖之前,我們應該需要了解一些zk的知識

  • 1、持久節點:客戶端斷開連接zk不刪除persistent類型節點
  • 2、臨時節點:客戶端斷開連接zk刪除ephemeral類型節點
  • 3、順序節點:節點後面會自動生成類似0000001的數字表示順序
  • 4、節點變化的通知:客戶端註冊了監聽節點變化的時候,會調用回調方法

 

一、zk實現的簡單的分佈式鎖

1、zk實現簡單的分佈式鎖的思路,主要是抓住一下三點

(1)當一個客戶端成功創建一個節點,另外一個客戶端是無法創建同名的節點(達到互斥的效果)

(2)我們註冊該節點的監聽時間,當節點刪除,會通知其他的客戶端,這個時候其他的客戶端可以重新去創建該節點(可以認為時拿到鎖的客戶端釋放鎖,其他的客戶端可以搶鎖)

(3)創建的節點應該時臨時節點,這樣保證我們在已經拿到鎖的客戶端掛掉了會自動釋放鎖

       

導入maven jar

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <!--客戶端-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

View Code

 

抽象模版:

AbstractLock.java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.ZkClient;

public abstract class AbstractLock {

    //zk地址和端口
    public static final String ZK_ADDR = "192.168.0.230:2181";
    //超時時間
    public static final int SESSION_TIMEOUT = 10000;
    //創建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);


    /**
     * 可以認為是模板模式,兩個子類分別實現它的抽象方法
     * 1,簡單的分佈式鎖
     * 2,高性能分佈式鎖
     */
    public void getLock() {
        String threadName = Thread.currentThread().getName();
        if (tryLock()) {
            System.out.println(threadName+"-獲取鎖成功");
        }else {
            System.out.println(threadName+"-獲取鎖失敗,進行等待...");
            waitLock();
            //遞歸重新獲取鎖
            getLock();
        }
    }

    public abstract void releaseLock();

    public abstract boolean tryLock();

    public abstract void waitLock();

}

View Code

SimpleZkLock.java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.concurrent.CountDownLatch;

public class SimpleZkLock extends AbstractLock {

    private static final String NODE_NAME = "/test_simple_lock";

    private CountDownLatch countDownLatch;

    @Override
    public void releaseLock() {
        if (null != zkClient) {
            //刪除節點
            zkClient.delete(NODE_NAME);
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + "-釋放鎖成功");
        }
    }

    //直接創建臨時節點,如果創建成功,則表示獲取了鎖,創建不成功則處理異常
    @Override
    public boolean tryLock() {
        if (null == zkClient) {
            return false;
        }
        try {
            zkClient.createEphemeral(NODE_NAME);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void waitLock() {
        //監聽器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            //節點被刪除回調
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }

            //節點改變被回調
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // TODO Auto-generated method stub

            }
        };
        zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
        //如果存在則阻塞
        if (zkClient.exists(NODE_NAME)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " 等待獲取鎖...");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //刪除監聽
        zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
    }
}

View Code

 

LockTest.java

package com.opendev.redssion.zookeeper;

public class LockTest {
    public static void main(String[] args) {
        //模擬多個10個客戶端
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }

    }

    static class LockRunnable implements Runnable {

        @Override
        public void run() {
            AbstractLock zkLock = new SimpleZkLock();
            //AbstractLock zkLock = new HighPerformanceZkLock();
            zkLock.getLock();
            //模擬業務操作
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            zkLock.releaseLock();
        }

    }
}

View Code

 

 

二、高性能分佈式鎖

上面使用zk實現的簡單的分佈式鎖,實現比較簡單,但是存在性能問題,從上面的打印的結果可以看出、每一次客戶端釋放鎖的時候,其他的客戶端都會去搶鎖,這就造成了不必要的浪費。那麼如果提升性能呢?

1、思路:客戶端在搶鎖的時候進行排隊,客戶端只要監聽它前一個節點的變化就行,如果前一個節點釋放了鎖,客戶端才去進行搶鎖操作,這個時候我們就需要創建順序節點了

2、圖解

(1)客戶端排隊

       

 

 (2)獲取鎖的邏輯

 

 HighPerformanceZkLock .java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 高性能分佈式鎖
 *
 * @author hongtaolong
 */
public class HighPerformanceZkLock extends AbstractLock {

    private static final String PATH = "/highPerformance_zklock";
    //當前節點路徑
    private String currentPath;
    //前一個節點的路徑
    private String beforePath;

    private CountDownLatch countDownLatch = null;

    public HighPerformanceZkLock() {
        //如果不存在這個節點,則創建持久節點
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    public void releaseLock() {
        if (null != zkClient) {
            zkClient.delete(currentPath);
            zkClient.close();
        }

    }

    @Override
    public boolean tryLock() {
        //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
        if (null == currentPath || "".equals(currentPath)) {
            //在path下創建一個臨時的順序節點
            currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
        }
        //獲取所有的臨時節點,並排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);
        if (currentPath.equals(PATH + "/" + childrens.get(0))) {
            return true;
        } else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength + 1));
            beforePath = PATH + "/" + childrens.get(wz - 1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            //節點刪除的回調機制
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch) {
                    countDownLatch.countDown();
                }
            }

            //節點數據改變的回調機制
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //監聽前一個節點的變化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    }
}

View Code