zk系列三:zookeeper實戰之分散式鎖實現
一、分散式鎖的通用實現思路
分散式鎖的概念以及常規解決方案可以參考之前的部落格:聊聊分散式鎖的解決方案;今天我們先分析下分散式鎖的實現思路;
- 首先,需要保證唯一性,即某一時點只能有一個執行緒訪問某一資源;比方說待辦簡訊通知功能,每天早上九點簡訊提醒所有工單的處理人處理工單,假設服務部署了20個容器,那麼早上九點的時候會有20個執行緒啟動準備發送簡訊,此時我們只能讓一個執行緒執行簡訊發送,否則用戶會收到20條相同的簡訊;
- 其次,需要考慮下何時應該釋放鎖?這又分三種情況,一是拿到鎖的執行緒正常結束,另一種是獲取鎖的執行緒異常退出,還有種是獲取鎖的執行緒一直阻塞;第一種情況直接釋放即可,第二種情況可以通過定義下鎖的過期時間然後通過定時任務去釋放鎖;zk的話直接通過臨時節點即可;最後一種阻塞的情況也可以通過定時任務來釋放,但是需要根據業務來綜合判斷,如果業務本身就是長時間耗時的操作那麼鎖的過期時間就得設置的久一點
- 最後,當拿到鎖的執行緒釋放鎖的時候,如何通知其他執行緒可以搶鎖了呢
這裡簡單介紹兩種解決方案,一種是所有需要鎖的執行緒主動輪詢,固定時間去訪問下看鎖是否釋放,但是這種方案無端增加伺服器壓力並且時效性無法保證;另一種就是zk的watch,監聽鎖所在的目錄,一有變化立馬得到通知
二、ZK實現分散式鎖的思路
- zk通過每個執行緒在同一父目錄下創建臨時有序節點,然後通過比較節點的id大小來實現分散式鎖功能;再通過zk的watch機制實時獲取節點的狀態,如果被刪除立即重新爭搶鎖;具體流程見線圖:

提示:需要關注下圖裡判斷自身不是最小節點時的監聽情況,為什麼不監聽父節點?原因圖裡已有描述,這裡就不再贅述
三、ZK實現分散式鎖的編碼實現
1、核心工具類實現
通過不斷的調試,我封裝了一個ZkLockHelper類,裡面封裝了上鎖和釋放鎖的方法,為了方便我將zk的一些監聽和回調機智也融合到一起了,並沒有抽出來,下面貼上該類的全部程式碼
package com.darling.service.zookeeper.lock;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: dll
* @date: Created in 2022/11/4 8:41
* @version:
* @modified By:
*/
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {
private final String lockPath = "/lockItem";
ZooKeeper zkClient;
String threadName;
CountDownLatch cd = new CountDownLatch(1);
private String pathName;
/**
* 上鎖
*/
public void tryLock() {
try {
log.info("執行緒:{}正在創建節點",threadName);
zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
log.info("執行緒:{}正在阻塞......",threadName);
// 由於上面是非同步創建所以這裡需要阻塞住當前執行緒
cd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 釋放鎖
*/
public void unLock() {
try {
zkClient.delete(pathName,-1);
System.out.println(threadName + " 工作結束....");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* create方法的回調,創建成功後在此處獲取/DCSLock的子目錄,比較節點ID是否最小,是則拿到鎖。。。
* @param rc 狀態碼
* @param path create方法的path入參
* @param ctx create方法的上下文入參
* @param name 創建成功的臨時有序節點的名稱,即在path的後面加上了zk維護的自增ID;
* 注意如果創建的不是有序節點,那麼此處的name和path的內容一致
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
if (StringUtils.isNotBlank(name)) {
try {
pathName = name ;
// 此處path需注意要寫/
zkClient.getChildren("/", false,this,"123");
// List<String> children = zkClient.getChildren("/", false);
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個
// if (Objects.equals(i,0)) {
// // 是第一個則表示搶到了鎖
// log.info("執行緒{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// // 表示沒搶到鎖
// log.info("執行緒{}搶鎖失敗,重新註冊監聽器",threadName);
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* exists方法的回調,此處暫不做處理
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
/**
* exists的watch監聽
* @param event
*/
@Override
public void process(WatchedEvent event) {
//如果第一個執行緒鎖釋放了,等價於第一個執行緒刪除了節點,此時只有第二個執行緒會監控的到
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zkClient.getChildren("/", false,this,"123");
// // 此處path需注意要寫"/"
// List<String> children = null;
// try {
// children = zkClient.getChildren("/", false);
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 給children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判斷自身是否第一個
// if (Objects.equals(i,0)) {
// // 是第一個則表示搶到了鎖
// log.info("執行緒{}搶到了鎖",threadName);
// cd.countDown();
// }else {
// /**
// * 表示沒搶到鎖;需要判斷前置節點存不存在,其實這裡並不是特別關心前置節點存不存在,所以其回調可以不處理;
// * 但是這裡關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他執行緒搶鎖之時
// */
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
/**
* getChildren方法的回調
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
try {
log.info(">>>>>threadName:{},children:{}", threadName, children);
if (Objects.isNull(children)) {
return;
}
// 給children排序
Collections.sort(children);
int i = children.indexOf(pathName.substring(1));
// 判斷自身是否第一個
if (Objects.equals(i, 0)) {
// 是第一個則表示搶到了鎖
log.info("執行緒{}搶到了鎖", threadName);
cd.countDown();
} else {
// 表示沒搶到鎖
log.info("執行緒{}搶鎖失敗,重新註冊監聽器", threadName);
/**
* 表示沒搶到鎖;需要判斷前置節點存不存在,其實這裡並不是特別關心前置節點存不存在,所以其回調可以不處理;
* 但是這裡關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他執行緒搶鎖之時
*/
zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
提示:程式碼中注釋的程式碼塊可以關注下,原本是直接阻塞式編程,將獲取所有子節點並釋放鎖的操作直接寫在getChildren方法的回調里,後來發現當節點被刪除時我們還要重新搶鎖,那麼程式碼就冗餘了,於是結合響應式編程的思想,將這段核心程式碼放到
getChildren方法的回調里,這樣程式碼簡潔了並且可以讓業務更只關注於getChildren這件事了
2、測試程式碼編寫
執行緒安全問題復現
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
/**
* @description: 開啟是個執行緒給i做遞減操作,未加鎖的情況下會有執行緒安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest02 {
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
incre();
}
}).start();
}
Thread.sleep(5000);
log.info("i = {}",i);
}
/**
* i遞減 執行緒不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("當前執行緒:{},i = {}",Thread.currentThread().getName(),i--);
}
}
- 上面程式碼運行結果如下:

使用上面封裝的ZkLockHelper實現的分散式鎖
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @description: 使用zk實現的分散式鎖解決執行緒安全問題
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest03 {
ZooKeeper zkClient;
@Before
public void conn (){
zkClient = ZkUtil.getZkClient();
}
@After
public void close (){
try {
zkClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
ZkLockHelper zkHelper = new ZkLockHelper();
// 這裡給zkHelper設置threadName是為了後續調試的時候日誌列印,便於觀察存在的問題
String threadName = Thread.currentThread().getName();
zkHelper.setThreadName(threadName);
zkHelper.setZkClient(zkClient);
// tryLock上鎖
zkHelper.tryLock();
incre();
log.info("執行緒{}正在執行業務程式碼...",threadName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 釋放鎖
zkHelper.unLock();
}
}).start();
}
while (true) {
}
}
/**
* i遞減 執行緒不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆當前執行緒:{},i = {}",Thread.currentThread().getName(),i--);
}
}
- 運行結果如下:

由於日誌中摻雜著zk的日誌所有此處並未截全,但是也能看到i是在按規律遞減的,不會出現通過執行緒拿到相同值的情況
四、zk實現分散式鎖的優缺點
優點
- 集群部署不存在單點故障問題
- 統一視圖
zk集群每個節點對外提供的數據是一致的,數據一致性有所報障 - 臨時有序節點
zk提供臨時有序節點,這樣當客戶端失去連接時會自動釋放鎖,不用像其他方案一樣當拿到鎖的實例服務不可用時,需要定時任務去刪除鎖;臨時節點的特性就是當客戶端失去連接會自動刪除 - watch能力加持
當獲取不到鎖時,無需客戶端定期輪詢爭搶,只需watch前一節點即可,當有變化時會及時通知,比普通方案即及時又高效;注意這裡最好只watch前一節點,如果watch整個父目錄的話,當客戶端並發較大時會不斷有請求進出zk,給zk性能帶來壓力
缺點
- 與單機版redis比較的話性能肯定較差,但是當客戶端集群足夠龐大且業務量足夠多時肯定還是集群更加穩定
好了,zk實現分散式鎖的編碼實現就到這了,後續有時間再寫偏redis的,其實思路縷清了,編碼實現還是很簡單的





