ZK(ZooKeeper)分散式鎖實現

點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你。
本文中案例都會在上傳到git上,請放心瀏覽
git地址://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper

準備

本文會使用到 三台 獨立伺服器,可以自行提前搭建好。

不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事兒

關於ZooKeeper 一些基礎命令可以看這篇:Zookeeper入門看這篇就夠了

前言

在平時我們對鎖的使用,在針對單個服務,我們可以用 Java 自帶的一些鎖來實現,資源的順序訪問,但是隨著業務的發展,現在基本上公司的服務都是多個,單純的 Lock或者Synchronize 只能解決單個JVM執行緒的問題,那麼針對於單個服務的 Java 的鎖是無法滿足我們業務的需要的,為了解決多個服務跨服務訪問共享資源,於是就有了分布鎖,分散式鎖產生的原因就是集群
在這裡插入圖片描述

正文

實現分散式鎖的方式有哪些呢?

  • 分散式鎖的實現方式主要以(ZooKeeper、Reids、Mysql)這三種為主

今天我們主要講解的是使用 ZooKeeper來實現分散式鎖,ZooKeeper的應用場景主要包含這幾個方面:

  1. 服務註冊與訂閱(共用節點)
  2. 分散式通知(監聽ZNode)
  3. 服務命令(ZNode特性)
  4. 數據訂閱、發布(Watcher)
  5. 分散式鎖(臨時節點)

ZooKeeper實現分散式鎖,主要是得益於ZooKeeper 保證了數據的強一致性,鎖的服務可以分為兩大類:

  • 保持獨佔

    所有試圖來獲取當前鎖的客戶端,最終有且只有一個能夠成功得到當前鎖的鑰匙,通常我們會把 ZooKeeper 上的節點(ZNode)看做一把鎖,通過 create 臨時節點的方式來實現,當多個客戶端都去創建一把鎖的時候,那麼只有成功創建了那個客戶端才能擁有這把鎖

  • 控制時序

    所有試圖獲取鎖的客戶端,都是被順序執行,只是會有一個序號(zxid),我們會有一個節點,例如:/testLock,所有臨時節點都在這個下面去創建,ZK的父節點(/testLock) 維持了一個序號,這個是ZK自帶的屬性,他保證了子節點創建的時序性,從而也形成了每個客戶端的一個 全局時序

ZK鎖機制

在實現ZooKeeper 分散式鎖之前我們有必要了解一下,關於ZooKeeper分散式鎖機制的實現流程和原理,不然各位寶貝,出去面試的時候怎麼和面試官侃侃而談~

臨時順序節點

基於ZooKeeper的臨時順序節點 ,ZooKeeper比較適合來實現分散式鎖:

  • 順序發號器: ZooKeeper的每一個節點,都是自帶順序生成器:在每個節點下面創建臨時節點,新的子節點後面,會添加一個次序編號,這個生成的編號,會在上一次的編號進行 +1 操作
  • 有序遞增: ZooKeeper節點有序遞增,可以保證鎖的公平性,我們只需要在一個持久父節點下,創建對應的臨時順序節點,每個執行緒在嘗試佔用鎖之前,會調用watch,判斷自己當前的序號是不是在當前父節點最小,如果是,那麼獲取鎖
  • Znode監聽: 每個執行緒在搶佔所之前,會創建屬於當前執行緒的ZNode節點,在釋放鎖的時候,會刪除創建的ZNode,當我們創建的序號不是最小的時候,會等待watch通知,也就是上一個ZNode的狀態通知,當前一個ZNode刪除的時候,會觸發回調機制,告訴下一個ZNode,你可以獲取鎖開始工作了
  • 臨時節點自動刪除:ZooKeeper還有一個好處,當我們客戶端斷開連接之後,我們出創建的臨時節點會進行自動刪除操作,所以我們在使用分散式鎖的時候,一般都是會去創建臨時節點,這樣可以避免因為網路異常等原因,造成的死鎖。
  • 羊群效應: ZooKeeper節點的順序訪問性,後面監聽前面的方式,可以有效的避免 羊群效應,什麼是羊群效應:當某一個節點掛掉了,所有的節點都要去監聽,然後做出回應,這樣會給伺服器帶來比較大壓力,如果有了臨時順序節點,當一個節點掛掉了,只有它後面的那一個節點才做出反應。

我們現在看一下下面一張圖:

在這裡插入圖片描述
在上圖中,ZooKeeper裡面有一把鎖節點 testLock,這個鎖就是ZooKeeper的一個節點,當兩個客戶端來獲取這把鎖的時候,會對ZooKeeper進行加鎖的請求,也就是我們所說的 臨時順序節點

當我們在 /testLock目錄下創建了一個順序臨時節點後,ZK會自動對這個臨時節點維護 一個節點序號,並且這個節點是遞增的,比如我們 clientA 創建了一個臨時順序節點,ZK內部會生成一個序號:/lock0000000001,那麼 clientB 也生成了一個臨時順序節點,ZK會生成一個序號為 /lock0000000002,在這裡數字都是依次遞增的,從1開始遞增,ZK內部會維護這個順序。

下圖所示:

![在這裡插入圖片描述](//img-blog.csdnimg.cn/c54606a5bcdd4866b528b0edc9c87a1b.png

這時候,ClientA會進行監聽判斷,在父節點下,我是不是最小的,如果是的話,那麼俺就可以加鎖了,因為我是最小的,其他的都比我大。我自己可以進行加鎖,你已經是一個成熟的臨時節點了,要學會自己加鎖。咳,那麼ZK是怎麼進行判斷的呢?寶貝,您往下看:

在這裡插入圖片描述
這個是cleintA已經加鎖完成了,這個時候clientB也要過來加鎖,那麼他也要在/testLock,創建一個屬於自己的臨時節點,那麼這個時候他的序號就會變成/lock0000000002,如下圖所示:
在這裡插入圖片描述

這個時候就會出現我們前面所講的,clientB 在加鎖的時候會判斷,自己是不是最小的,一看在當前父節點下不是最小的,啊~我還挺大的,還有比我小的!!!

加鎖失敗呀,咳咳,這個時候呢,clientB 就會去偷窺clientA,氣氛逐漸曖昧起來,啊不是,是按照順序去監聽前一個節點(clientA),是否完成工作了,如果完成了,clientB才可以進行加鎖工作,寶貝,你往下看圖片:

在這裡插入圖片描述

clientA 加鎖成功後,會進行自己的業務處理,當 clientA 處理完工作後,說我完事了,下一個,那麼 clientA 是怎麼完事的呢,他多長時間?不是,具體流程是怎樣的?小農你不對勁,說什麼呢!!!真羞澀

上面我們不是說了,當 clientB 加鎖失敗後,會給前一個節點(clientA)加上一個監聽,當clientA被刪除以後,就表示有人釋放了鎖,這個時候就會通知 clientB重新去獲取鎖。

在這裡插入圖片描述

這個時候clientB重新獲取鎖的時候,發現自己就是當前父節點下面最小的那個,於是clientB就開始加鎖,開始工作等一系列操作,當clientB 完事以後,釋放鎖,也說了一句,下一個。

如下圖所示:

在這裡插入圖片描述

當然除了 clientA、clientB還有C\D\E等,這字母看著好奇怪又好熟悉,原理都是一樣的,都是最小節點進行解鎖,如果不是,監聽前一個節點是否釋放,如果釋放了,再次嘗試加鎖。如果前一節節點釋放了,自己就是最小了,就排到前面去了,有點類似於 銀行取號 的操作。

程式碼實現

使用ZooKeeper 創建臨時順序節點來實現分散式鎖,大體的流程就是 先創建一個持久父節點,在當前節點下,創建臨時順序節點,找出最小的序列號,獲取分散式鎖,程式業務完成之後釋放鎖,通知下一個節點進行操作,使用的是watch來監控節點的變化,然後依次下一個最小序列節點進行操作。

首先我們需要創建一個持久父類節點:我這裡是 /mxn
在這裡插入圖片描述

WatchCallBack

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;


/**
 * @program: mxnzookeeper
 * @ClassName WatchCallBack
 * @description:
 * @author: 微信搜索:牧小農
 * @create: 2021-10-23 10:48
 * @Version 1.0
 **/
public class WatchCallBack  implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {

    ZooKeeper zk ;
    String threadName;
    CountDownLatch cc = new CountDownLatch(1);
    String pathName;

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    /** @Author 牧小農
     * @Description //TODO 嘗試加鎖方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void tryLock(){
        try {

            System.out.println(threadName + " 開始創建。。。。");
            //創建一個順序臨時節點
            zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
            //阻塞當前,監聽前一個節點是否釋放鎖
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** @Author 牧小農
     * @Description //TODO 解鎖方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void unLock(){
        try {
            //釋放鎖,刪除臨時節點
            zk.delete(pathName,-1);
            //結束工作
            System.out.println(threadName + "         結束工作了....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void process(WatchedEvent event) {

        //如果第一個節點釋放了鎖,那麼第二個就會收到回調
        //告訴它前一個節點釋放了,你可以開始嘗試獲取鎖
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //當前節點重新獲取鎖
                zk.getChildren("/",false,this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }

    }

    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name != null ){
            System.out.println(threadName  +" 執行緒創建了一個節點為 : " +  name );
            pathName =  name ;
            //監聽前一個節點
            zk.getChildren("/",false,this ,"sdf");
        }

    }

    //getChildren  call back
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

        //節點按照編號,升序排列
        Collections.sort(children);
        //對節點進行截取例如  /lock0000000022 截取後就是  lock0000000022
        int i = children.indexOf(pathName.substring(1));


        //是不是第一個,也就是說是不是最小的
        if(i == 0){
            //是第一個
            System.out.println(threadName +" 現在我是最小的....");
            try {
                zk.setData("/",threadName.getBytes(),-1);
                cc.countDown();

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            //不是第一個
            //監聽前一個節點 看它是不是完成了工作進行釋放鎖了
            zk.exists("/"+children.get(i-1),this,this,"sdf");
        }

    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //判斷是否失敗exists
    }
}

TestLock

import com.mxn.zookeeper.config.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;


/**
 * @program: mxnzookeeper
 * @ClassName TestLock
 * @description:
 * @author: 微信搜索:牧小農
 * @create: 2021-10-23 10:45
 * @Version 1.0
 **/
public class TestLock {


    ZooKeeper zk ;

    @Before
    public void conn (){
        zk  = ZKUtils.getZK();
    }

    @After
    public void close (){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lock(){

        //創建十個執行緒
        for (int i = 0; i < 10; i++) {
            new Thread(){
                @Override
                public void run() {
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    String threadName = Thread.currentThread().getName();
                    watchCallBack.setThreadName(threadName);
                    //執行緒進行搶鎖操作
                    watchCallBack.tryLock();
                    try {
                        //進行業務邏輯處理
                        System.out.println(threadName+"         開始處理業務邏輯了...");
                        Thread.sleep(200);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    //釋放鎖
                    watchCallBack.unLock();


                }
            }.start();
        }


        while(true){

        }

    }

}

運行結果:

Thread-1 執行緒創建了一個節點為 : /lock0000000112
Thread-5 執行緒創建了一個節點為 : /lock0000000113
Thread-2 執行緒創建了一個節點為 : /lock0000000114
Thread-6 執行緒創建了一個節點為 : /lock0000000115
Thread-9 執行緒創建了一個節點為 : /lock0000000116
Thread-4 執行緒創建了一個節點為 : /lock0000000117
Thread-7 執行緒創建了一個節點為 : /lock0000000118
Thread-3 執行緒創建了一個節點為 : /lock0000000119
Thread-8 執行緒創建了一個節點為 : /lock0000000120
Thread-0 執行緒創建了一個節點為 : /lock0000000121
Thread-1 現在我是最小的....
Thread-1         開始處理業務邏輯了...
Thread-1         結束工作了....
Thread-5 現在我是最小的....
Thread-5         開始處理業務邏輯了...
Thread-5         結束工作了....
Thread-2 現在我是最小的....
Thread-2         開始處理業務邏輯了...
Thread-2         結束工作了....
Thread-6 現在我是最小的....
Thread-6         開始處理業務邏輯了...
Thread-6         結束工作了....
Thread-9 現在我是最小的....
Thread-9         開始處理業務邏輯了...
Thread-9         結束工作了....
Thread-4 現在我是最小的....
Thread-4         開始處理業務邏輯了...
Thread-4         結束工作了....
Thread-7 現在我是最小的....
Thread-7         開始處理業務邏輯了...
Thread-7         結束工作了....
Thread-3 現在我是最小的....
Thread-3         開始處理業務邏輯了...
Thread-3         結束工作了....
Thread-8 現在我是最小的....
Thread-8         開始處理業務邏輯了...
Thread-8         結束工作了....
Thread-0 現在我是最小的....
Thread-0         開始處理業務邏輯了...
Thread-0         結束工作了....

總結

ZK分散式鎖,能夠有效的解決分散式、不可重入的問題,在上面的案例中我, 沒有實現可重入鎖,但是實現起來也不麻煩,只需要帶上執行緒資訊等唯一標識,判斷一下就可以了

ZK實現分散式鎖具有天然的優勢,臨時順序節點,可以有效的避免死鎖問題,讓客戶端斷開,那麼就會刪除當前臨時節點,讓下一個節點進行工作。

如果文中有錯誤或者不了解的地方,歡迎留言,小農看見了會第一時間回復大家,大家加油

我是牧小農,一個卑微的打工人,如果覺得文中的內容對你有幫助,記得一鍵三連啊,你們的三連是小農最大的動力。

我是牧小農,怕什麼真理無窮,進一步 有進一步的歡喜,大家加油~