求你了,別再問我Zookeeper如何實現分散式鎖了!!!
- 2020 年 4 月 9 日
- 筆記
-
真是有人(
鎖
)的地方就有江湖(事務
),今天不談江湖,來撩撩人。 -
分散式鎖的概念、為什麼使用分散式鎖,想必大家已經很清楚了。前段時間作者寫過Redis是如何實現分散式鎖,今天這篇文章來談談Zookeeper是如何實現分散式鎖的。
-
陳某今天分別從如下幾個方面來詳細講講ZK如何實現分散式鎖:
-
ZK的四種節點
-
排它鎖的實現
-
讀寫鎖的實現
-
Curator實現分步式鎖
-
ZK的四種節點
-
持久性節點:節點創建後將會一直存在
-
臨時節點:臨時節點的生命周期和當前會話綁定,一旦當前會話斷開臨時節點也會刪除,當然可以主動刪除。
-
持久有序節點:節點創建一直存在,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。
-
臨時有序節點:保留臨時節點的特性,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。
排它鎖的實現
-
排他鎖的實現相對簡單一點,利用了zk的創建節點不能重名的特性。如下圖:
-
根據上圖分析大致分為如下步驟:
-
嘗試獲取鎖:創建
臨時節點
,zk會保證只有一個客戶端創建成功。 -
創建臨時節點成功,獲取鎖成功,執行業務邏輯,業務執行完成後刪除鎖。
-
創建臨時節點失敗,阻塞等待。
-
監聽刪除事件,一旦臨時節點刪除了,表示互斥操作完成了,可以再次嘗試獲取鎖。
-
遞歸:獲取鎖的過程是一個遞歸的操作,
獲取鎖->監聽->獲取鎖
。
-
-
如何避免死鎖:創建的是臨時節點,當服務宕機會話關閉後臨時節點將會被刪除,鎖自動釋放。
程式碼實現
-
作者參照JDK鎖的實現方式加上模板方法模式的封裝,封裝介面如下:
/** * @Description ZK分散式鎖的介面 * @Author 陳某 * @Date 2020/4/7 22:52 */ public interface ZKLock { /** * 獲取鎖 */ void lock() throws Exception; /** * 解鎖 */ void unlock() throws Exception; }
-
模板抽象類如下:
/** * @Description 排他鎖,模板類 * @Author 陳某 * @Date 2020/4/7 22:55 */ public abstract class AbstractZKLockMutex implements ZKLock { /** * 節點路徑 */ protected String lockPath; /** * zk客戶端 */ protected CuratorFramework zkClient; private AbstractZKLockMutex(){} public AbstractZKLockMutex(String lockPath,CuratorFramework client){ this.lockPath=lockPath; this.zkClient=client; } /** * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現 * @throws Exception */ @Override public final void lock() throws Exception { //獲取鎖成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"獲取鎖成功"); }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞歸,再次獲取鎖 lock(); } } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryLock() ; /** * 等待獲取鎖,子類實現 */ protected abstract void waitLock() throws Exception; /** * 解鎖:刪除節點或者直接斷開連接 */ @Override public abstract void unlock() throws Exception; }
-
排他鎖的具體實現類如下:
/** * @Description 排他鎖的實現類,繼承模板類 AbstractZKLockMutex * @Author 陳某 * @Date 2020/4/7 23:23 */ @Data public class ZKLockMutex extends AbstractZKLockMutex { /** * 用於實現執行緒阻塞 */ private CountDownLatch countDownLatch; public ZKLockMutex(String lockPath,CuratorFramework zkClient){ super(lockPath,zkClient); } /** * 嘗試獲取鎖:直接創建一個臨時節點,如果這個節點存在創建失敗拋出異常,表示已經互斥了, * 反之創建成功 * @throws Exception */ @Override protected boolean tryLock() { try { zkClient.create() //臨時節點 .withMode(CreateMode.EPHEMERAL) //許可權列表 world:anyone:crdwa .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(lockPath,"lock".getBytes()); return true; }catch (Exception ex){ return false; } } /** * 等待鎖,一直阻塞監聽 * @return 成功獲取鎖返回true,反之返回false */ @Override protected void waitLock() throws Exception { //監聽節點的新增、更新、刪除 final NodeCache nodeCache = new NodeCache(zkClient, lockPath); //啟動監聽 nodeCache.start(); ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable(); //監聽器 NodeCacheListener listener=()-> { //節點被刪除,此時獲取鎖 if (nodeCache.getCurrentData() == null) { //countDownLatch不為null,表示節點存在,此時監聽到節點刪除了,因此-1 if (countDownLatch != null) countDownLatch.countDown(); } }; //添加監聽器 listenable.addListener(listener); //判斷節點是否存在 Stat stat = zkClient.checkExists().forPath(lockPath); //節點存在 if (stat!=null){ countDownLatch=new CountDownLatch(1); //阻塞主執行緒,監聽 countDownLatch.await(); } //移除監聽器 listenable.removeListener(listener); } /** * 解鎖,直接刪除節點 * @throws Exception */ @Override public void unlock() throws Exception { zkClient.delete().forPath(lockPath); } }
可重入性排他鎖如何設計
-
可重入的邏輯很簡單,在本地保存一個
ConcurrentMap
,key
是當前執行緒,value
是定義的數據,結構如下:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
-
重入的偽程式碼如下:
public boolean tryLock(){ //判斷當前執行緒是否在threadData保存過 //存在,直接return true //不存在執行獲取鎖的邏輯 //獲取成功保存在threadData中 }
讀寫鎖的實現
-
讀寫鎖分為讀鎖和寫鎖,區別如下:
-
讀鎖允許多個執行緒同時讀數據,但是在讀的同時不允許寫執行緒修改。
-
寫鎖在獲取後,不允許多個執行緒同時寫或者讀。
-
-
如何實現讀寫鎖?ZK中有一類節點叫臨時有序節點,上文有介紹。下面我們來利用臨時有序節點來實現讀寫鎖的功能。
讀鎖的設計
-
讀鎖允許多個執行緒同時進行讀,並且在讀的同時不允許執行緒進行寫操作,實現原理如下圖:
-
根據上圖,獲取一個讀鎖分為以下步驟:
-
創建臨時有序節點(當前執行緒擁有的
讀鎖
或稱作讀節點
)。 -
獲取路徑下所有的子節點,並進行
從小到大
排序 -
獲取當前節點前的臨近寫節點(寫鎖)。
-
如果不存在的臨近寫節點,則成功獲取讀鎖。
-
如果存在臨近寫節點,對其監聽刪除事件。
-
一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)。
-
寫鎖的設計
-
執行緒一旦獲取了寫鎖,不允許其他執行緒讀和寫。實現原理如下:
-
從上圖可以看出唯一和寫鎖不同的就是監聽的節點,這裡是監聽臨近節點(讀節點或者寫節點),讀鎖只需要監聽寫節點,步驟如下:
-
創建臨時有序節點(當前執行緒擁有的
寫鎖
或稱作寫節點
)。 -
獲取路徑下的所有子節點,並進行
從小到大
排序。 -
獲取當前節點的臨近節點(讀節點和寫節點)。
-
如果不存在臨近節點,則成功獲取鎖。
-
如果存在臨近節點,對其進行監聽刪除事件。
-
一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)。
-
如何監聽
-
無論是寫鎖還是讀鎖都需要監聽前面的節點,不同的是讀鎖只監聽臨近的寫節點,寫鎖是監聽臨近的所有節點,抽象出來看其實是一種鏈式的監聽,如下圖:
-
每一個節點都在監聽前面的臨近節點,一旦前面一個節點刪除了,再從新排序後監聽前面的節點,這樣遞歸下去。
程式碼實現
-
作者簡單的寫了讀寫鎖的實現,先造出來再優化,不建議用在生產環境。程式碼如下:
public class ZKLockRW { /** * 節點路徑 */ protected String lockPath; /** * zk客戶端 */ protected CuratorFramework zkClient; /** * 用於阻塞執行緒 */ private CountDownLatch countDownLatch=new CountDownLatch(1); private final static String WRITE_NAME="_W_LOCK"; private final static String READ_NAME="_R_LOCK"; public ZKLockRW(String lockPath, CuratorFramework client) { this.lockPath=lockPath; this.zkClient=client; } /** * 獲取鎖,如果獲取失敗一直阻塞 * @throws Exception */ public void lock() throws Exception { //創建節點 String node = createNode(); //阻塞等待獲取鎖 tryLock(node); countDownLatch.await(); } /** * 創建臨時有序節點 * @return * @throws Exception */ private String createNode() throws Exception { //創建臨時有序節點 return zkClient.create() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(lockPath); } /** * 獲取寫鎖 * @return */ public ZKLockRW writeLock(){ return new ZKLockRW(lockPath+WRITE_NAME,zkClient); } /** * 獲取讀鎖 * @return */ public ZKLockRW readLock(){ return new ZKLockRW(lockPath+READ_NAME,zkClient); } private void tryLock(String nodePath) throws Exception { //獲取所有的子節點 List<String> childPaths = zkClient.getChildren() .forPath("/") .stream().sorted().map(o->"/"+o).collect(Collectors.toList()); //第一個節點就是當前的鎖,直接獲取鎖。遞歸結束的條件 if (nodePath.equals(childPaths.get(0))){ countDownLatch.countDown(); return; } //1. 讀鎖:監聽最前面的寫鎖,寫鎖釋放了,自然能夠讀了 if (nodePath.contains(READ_NAME)){ //查找臨近的寫鎖 String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath)); if (preNode==null){ countDownLatch.countDown(); return; } NodeCache nodeCache=new NodeCache(zkClient,preNode); nodeCache.start(); ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable(); listenable.addListener(() -> { //節點刪除事件 if (nodeCache.getCurrentData()==null){ //繼續監聽前一個節點 String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode)); if (nearWriteNode==null){ countDownLatch.countDown(); return; } tryLock(nearWriteNode); } }); } //如果是寫鎖,前面無論是什麼鎖都不能讀,直接循環監聽上一個節點即可,直到前面無鎖 if (nodePath.contains(WRITE_NAME)){ String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1); NodeCache nodeCache=new NodeCache(zkClient,preNode); nodeCache.start(); ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable(); listenable.addListener(() -> { //節點刪除事件 if (nodeCache.getCurrentData()==null){ //繼續監聽前一個節點 tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1)); } }); } } /** * 查找臨近的寫節點 * @param childPath 全部的子節點 * @param index 右邊界 * @return */ private String getNearWriteNode(List<String> childPath,Integer index){ for (int i = 0; i < index; i++) { String node = childPath.get(i); if (node.contains(WRITE_NAME)) return node; } return null; } }
Curator實現分步式鎖
-
Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。
-
Curator在分散式鎖方面已經為我們封裝好了,大致實現的思路就是按照作者上述的思路實現的。中小型互聯網公司還是建議直接使用框架封裝好的,畢竟穩定,有些大型的互聯公司都是手寫的,牛逼啊。
-
創建一個排他鎖很簡單,如下:
//arg1:CuratorFramework連接對象,arg2:節點路徑 lock=new InterProcessMutex(client,path); //獲取鎖 lock.acquire(); //釋放鎖 lock.release();
-
更多的API請參照官方文檔,不是此篇文章重點。
-
至此ZK實現分散式鎖就介紹完了,如有想要源碼的朋友,老規矩,關注微信公眾號【碼猿技術專欄】,回復關鍵詞
分散式鎖
獲取。
一點小福利
-
對於Zookeeper不太熟悉的朋友,陳某特地花費兩天時間總結了ZK的常用知識點,包括ZK常用shell命令、ZK許可權控制、Curator的基本操作API。目錄如下:
-
-
需要上面PDF文件的朋友,老規矩,關注微信公眾號【碼猿技術專欄】回復關鍵詞
ZK總結
。