ZK(ZooKeeper)分散式鎖實現
- 2021 年 10 月 24 日
- 筆記
- JAVA, zookeeper, ZooKeeper分散式鎖, 消息中間件
點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你。
本文中案例都會在上傳到git上,請放心瀏覽
git地址://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper
準備
本文會使用到 三台 獨立伺服器,可以自行提前搭建好。
不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事兒
關於ZooKeeper 一些基礎命令可以看這篇:Zookeeper入門看這篇就夠了
前言
在平時我們對鎖的使用,在針對單個服務,我們可以用 Java 自帶的一些鎖來實現,資源的順序訪問,但是隨著業務的發展,現在基本上公司的服務都是多個,單純的 Lock或者Synchronize 只能解決單個JVM執行緒的問題,那麼針對於單個服務的 Java 的鎖是無法滿足我們業務的需要的,為了解決多個服務跨服務訪問共享資源,於是就有了分布鎖,分散式鎖產生的原因就是集群。
正文
實現分散式鎖的方式有哪些呢?
- 分散式鎖的實現方式主要以(ZooKeeper、Reids、Mysql)這三種為主
今天我們主要講解的是使用 ZooKeeper來實現分散式鎖,ZooKeeper的應用場景主要包含這幾個方面:
- 服務註冊與訂閱(共用節點)
- 分散式通知(監聽ZNode)
- 服務命令(ZNode特性)
- 數據訂閱、發布(Watcher)
- 分散式鎖(臨時節點)
ZooKeeper實現分散式鎖,主要是得益於ZooKeeper 保證了數據的強一致性,鎖的服務可以分為兩大類:
-
保持獨佔
所有試圖來獲取當前鎖的客戶端,最終有且只有一個能夠成功得到當前鎖的鑰匙,通常我們會把 ZooKeeper 上的節點(ZNode)看做一把鎖,通過
create
臨時節點的方式來實現,當多個客戶端都去創建一把鎖的時候,那麼只有成功創建了那個客戶端才能擁有這把鎖 -
控制時序
所有試圖獲取鎖的客戶端,都是被順序執行,只是會有一個序號(zxid),我們會有一個節點,例如:
/testLock
,所有臨時節點都在這個下面去創建,ZK的父節點(/testLock) 維持了一個序號,這個是ZK自帶的屬性,他保證了子節點創建的時序性,從而也形成了每個客戶端的一個 全局時序
ZK鎖機制
在實現ZooKeeper 分散式鎖之前我們有必要了解一下,關於ZooKeeper分散式鎖機制的實現流程和原理,不然各位寶貝,出去面試的時候怎麼和面試官侃侃而談~
臨時順序節點
基於ZooKeeper的臨時順序節點 ,ZooKeeper比較適合來實現分散式鎖:
- 順序發號器:
ZooKeeper
的每一個節點,都是自帶順序生成器:在每個節點下面創建臨時節點,新的子節點後面,會添加一個次序編號,這個生成的編號,會在上一次的編號進行 +1 操作 - 有序遞增:
ZooKeeper
節點有序遞增,可以保證鎖的公平性,我們只需要在一個持久父節點下,創建對應的臨時順序節點,每個執行緒在嘗試佔用鎖之前,會調用watch,判斷自己當前的序號是不是在當前父節點最小,如果是,那麼獲取鎖 - Znode監聽: 每個執行緒在搶佔所之前,會創建屬於當前執行緒的ZNode節點,在釋放鎖的時候,會刪除創建的ZNode,當我們創建的序號不是最小的時候,會等待watch通知,也就是上一個ZNode的狀態通知,當前一個ZNode刪除的時候,會觸發回調機制,告訴下一個ZNode,你可以獲取鎖開始工作了
- 臨時節點自動刪除:
ZooKeeper
還有一個好處,當我們客戶端斷開連接之後,我們出創建的臨時節點會進行自動刪除操作,所以我們在使用分散式鎖的時候,一般都是會去創建臨時節點,這樣可以避免因為網路異常等原因,造成的死鎖。 - 羊群效應:
ZooKeeper
節點的順序訪問性,後面監聽前面的方式,可以有效的避免 羊群效應,什麼是羊群效應:當某一個節點掛掉了,所有的節點都要去監聽,然後做出回應,這樣會給伺服器帶來比較大壓力,如果有了臨時順序節點,當一個節點掛掉了,只有它後面的那一個節點才做出反應。
我們現在看一下下面一張圖:
在上圖中,ZooKeeper
裡面有一把鎖節點 testLock
,這個鎖就是ZooKeeper
的一個節點,當兩個客戶端來獲取這把鎖的時候,會對ZooKeeper
進行加鎖的請求,也就是我們所說的 臨時順序節點。
當我們在 /testLock
目錄下創建了一個順序臨時節點後,ZK會自動對這個臨時節點維護 一個節點序號,並且這個節點是遞增的,比如我們 clientA 創建了一個臨時順序節點,ZK內部會生成一個序號:/lock0000000001
,那麼 clientB 也生成了一個臨時順序節點,ZK會生成一個序號為 /lock0000000002
,在這裡數字都是依次遞增的,從1開始遞增,ZK內部會維護這個順序。
下圖所示:
,是否完成工作了,如果完成了,clientB才可以進行加鎖工作,寶貝,你往下看圖片:
clientA 加鎖成功後,會進行自己的業務處理,當 clientA 處理完工作後,說我完事了,下一個,那麼 clientA 是怎麼完事的呢,他多長時間?不是,具體流程是怎樣的?小農你不對勁,說什麼呢!!!真羞澀
上面我們不是說了,當 clientB 加鎖失敗後,會給前一個節點(clientA)加上一個監聽,當clientA被刪除以後,就表示有人釋放了鎖,這個時候就會通知 clientB重新去獲取鎖。
這個時候clientB重新獲取鎖的時候,發現自己就是當前父節點下面最小的那個,於是clientB就開始加鎖,開始工作等一系列操作,當clientB 完事以後,釋放鎖,也說了一句,下一個。
如下圖所示:
當然除了 clientA、clientB
還有C\D\E等,這字母看著好奇怪又好熟悉,原理都是一樣的,都是最小節點進行解鎖,如果不是,監聽前一個節點是否釋放,如果釋放了,再次嘗試加鎖。如果前一節節點釋放了,自己就是最小了,就排到前面去了,有點類似於 銀行取號 的操作。
程式碼實現
使用ZooKeeper 創建臨時順序節點來實現分散式鎖,大體的流程就是 先創建一個持久父節點,在當前節點下,創建臨時順序節點,找出最小的序列號,獲取分散式鎖,程式業務完成之後釋放鎖,通知下一個節點進行操作,使用的是watch來監控節點的變化,然後依次下一個最小序列節點進行操作。
首先我們需要創建一個持久父類節點:我這裡是 /mxn
WatchCallBack
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @program: mxnzookeeper
* @ClassName WatchCallBack
* @description:
* @author: 微信搜索:牧小農
* @create: 2021-10-23 10:48
* @Version 1.0
**/
public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
ZooKeeper zk ;
String threadName;
CountDownLatch cc = new CountDownLatch(1);
String pathName;
public String getPathName() {
return pathName;
}
public void setPathName(String pathName) {
this.pathName = pathName;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
/** @Author 牧小農
* @Description //TODO 嘗試加鎖方法
* @Date 16:14 2021/10/24
* @Param
* @return
**/
public void tryLock(){
try {
System.out.println(threadName + " 開始創建。。。。");
//創建一個順序臨時節點
zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
//阻塞當前,監聽前一個節點是否釋放鎖
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** @Author 牧小農
* @Description //TODO 解鎖方法
* @Date 16:14 2021/10/24
* @Param
* @return
**/
public void unLock(){
try {
//釋放鎖,刪除臨時節點
zk.delete(pathName,-1);
//結束工作
System.out.println(threadName + " 結束工作了....");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
//如果第一個節點釋放了鎖,那麼第二個就會收到回調
//告訴它前一個節點釋放了,你可以開始嘗試獲取鎖
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
//當前節點重新獲取鎖
zk.getChildren("/",false,this ,"sdf");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if(name != null ){
System.out.println(threadName +" 執行緒創建了一個節點為 : " + name );
pathName = name ;
//監聽前一個節點
zk.getChildren("/",false,this ,"sdf");
}
}
//getChildren call back
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
//節點按照編號,升序排列
Collections.sort(children);
//對節點進行截取例如 /lock0000000022 截取後就是 lock0000000022
int i = children.indexOf(pathName.substring(1));
//是不是第一個,也就是說是不是最小的
if(i == 0){
//是第一個
System.out.println(threadName +" 現在我是最小的....");
try {
zk.setData("/",threadName.getBytes(),-1);
cc.countDown();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
//不是第一個
//監聽前一個節點 看它是不是完成了工作進行釋放鎖了
zk.exists("/"+children.get(i-1),this,this,"sdf");
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
//判斷是否失敗exists
}
}
TestLock
import com.mxn.zookeeper.config.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @program: mxnzookeeper
* @ClassName TestLock
* @description:
* @author: 微信搜索:牧小農
* @create: 2021-10-23 10:45
* @Version 1.0
**/
public class TestLock {
ZooKeeper zk ;
@Before
public void conn (){
zk = ZKUtils.getZK();
}
@After
public void close (){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void lock(){
//創建十個執行緒
for (int i = 0; i < 10; i++) {
new Thread(){
@Override
public void run() {
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
String threadName = Thread.currentThread().getName();
watchCallBack.setThreadName(threadName);
//執行緒進行搶鎖操作
watchCallBack.tryLock();
try {
//進行業務邏輯處理
System.out.println(threadName+" 開始處理業務邏輯了...");
Thread.sleep(200);
}catch (Exception e){
e.printStackTrace();
}
//釋放鎖
watchCallBack.unLock();
}
}.start();
}
while(true){
}
}
}
運行結果:
Thread-1 執行緒創建了一個節點為 : /lock0000000112
Thread-5 執行緒創建了一個節點為 : /lock0000000113
Thread-2 執行緒創建了一個節點為 : /lock0000000114
Thread-6 執行緒創建了一個節點為 : /lock0000000115
Thread-9 執行緒創建了一個節點為 : /lock0000000116
Thread-4 執行緒創建了一個節點為 : /lock0000000117
Thread-7 執行緒創建了一個節點為 : /lock0000000118
Thread-3 執行緒創建了一個節點為 : /lock0000000119
Thread-8 執行緒創建了一個節點為 : /lock0000000120
Thread-0 執行緒創建了一個節點為 : /lock0000000121
Thread-1 現在我是最小的....
Thread-1 開始處理業務邏輯了...
Thread-1 結束工作了....
Thread-5 現在我是最小的....
Thread-5 開始處理業務邏輯了...
Thread-5 結束工作了....
Thread-2 現在我是最小的....
Thread-2 開始處理業務邏輯了...
Thread-2 結束工作了....
Thread-6 現在我是最小的....
Thread-6 開始處理業務邏輯了...
Thread-6 結束工作了....
Thread-9 現在我是最小的....
Thread-9 開始處理業務邏輯了...
Thread-9 結束工作了....
Thread-4 現在我是最小的....
Thread-4 開始處理業務邏輯了...
Thread-4 結束工作了....
Thread-7 現在我是最小的....
Thread-7 開始處理業務邏輯了...
Thread-7 結束工作了....
Thread-3 現在我是最小的....
Thread-3 開始處理業務邏輯了...
Thread-3 結束工作了....
Thread-8 現在我是最小的....
Thread-8 開始處理業務邏輯了...
Thread-8 結束工作了....
Thread-0 現在我是最小的....
Thread-0 開始處理業務邏輯了...
Thread-0 結束工作了....
總結
ZK分散式鎖,能夠有效的解決分散式、不可重入的問題,在上面的案例中我, 沒有實現可重入鎖,但是實現起來也不麻煩,只需要帶上執行緒資訊等唯一標識,判斷一下就可以了
ZK實現分散式鎖具有天然的優勢,臨時順序節點,可以有效的避免死鎖問題,讓客戶端斷開,那麼就會刪除當前臨時節點,讓下一個節點進行工作。
如果文中有錯誤或者不了解的地方,歡迎留言,小農看見了會第一時間回復大家,大家加油
我是牧小農,一個卑微的打工人,如果覺得文中的內容對你有幫助,記得一鍵三連啊,你們的三連是小農最大的動力。
我是牧小農,怕什麼真理無窮,進一步 有進一步的歡喜,大家加油~