zookeeper實現分佈式鎖
在學習zk實現分佈鎖之前,我們應該需要了解一些zk的知識
- 1、持久節點:客戶端斷開連接zk不刪除persistent類型節點
- 2、臨時節點:客戶端斷開連接zk刪除ephemeral類型節點
- 3、順序節點:節點後面會自動生成類似0000001的數字表示順序
- 4、節點變化的通知:客戶端註冊了監聽節點變化的時候,會調用回調方法
一、zk實現的簡單的分佈式鎖
1、zk實現簡單的分佈式鎖的思路,主要是抓住一下三點
(1)當一個客戶端成功創建一個節點,另外一個客戶端是無法創建同名的節點(達到互斥的效果)
(2)我們註冊該節點的監聽時間,當節點刪除,會通知其他的客戶端,這個時候其他的客戶端可以重新去創建該節點(可以認為時拿到鎖的客戶端釋放鎖,其他的客戶端可以搶鎖)
(3)創建的節點應該時臨時節點,這樣保證我們在已經拿到鎖的客戶端掛掉了會自動釋放鎖

導入maven jar

<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<!--客戶端-->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
View Code
抽象模版:
AbstractLock.java

package com.opendev.redssion.zookeeper; import org.I0Itec.zkclient.ZkClient; public abstract class AbstractLock { //zk地址和端口 public static final String ZK_ADDR = "192.168.0.230:2181"; //超時時間 public static final int SESSION_TIMEOUT = 10000; //創建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); /** * 可以認為是模板模式,兩個子類分別實現它的抽象方法 * 1,簡單的分佈式鎖 * 2,高性能分佈式鎖 */ public void getLock() { String threadName = Thread.currentThread().getName(); if (tryLock()) { System.out.println(threadName+"-獲取鎖成功"); }else { System.out.println(threadName+"-獲取鎖失敗,進行等待..."); waitLock(); //遞歸重新獲取鎖 getLock(); } } public abstract void releaseLock(); public abstract boolean tryLock(); public abstract void waitLock(); }
View Code
SimpleZkLock.java

package com.opendev.redssion.zookeeper; import org.I0Itec.zkclient.IZkDataListener; import java.util.concurrent.CountDownLatch; public class SimpleZkLock extends AbstractLock { private static final String NODE_NAME = "/test_simple_lock"; private CountDownLatch countDownLatch; @Override public void releaseLock() { if (null != zkClient) { //刪除節點 zkClient.delete(NODE_NAME); zkClient.close(); System.out.println(Thread.currentThread().getName() + "-釋放鎖成功"); } } //直接創建臨時節點,如果創建成功,則表示獲取了鎖,創建不成功則處理異常 @Override public boolean tryLock() { if (null == zkClient) { return false; } try { zkClient.createEphemeral(NODE_NAME); return true; } catch (Exception e) { return false; } } @Override public void waitLock() { //監聽器 IZkDataListener iZkDataListener = new IZkDataListener() { //節點被刪除回調 @Override public void handleDataDeleted(String dataPath) throws Exception { if (countDownLatch != null) { countDownLatch.countDown(); } } //節點改變被回調 @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub } }; zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener); //如果存在則阻塞 if (zkClient.exists(NODE_NAME)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " 等待獲取鎖..."); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //刪除監聽 zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener); } }
View Code
LockTest.java

package com.opendev.redssion.zookeeper; public class LockTest { public static void main(String[] args) { //模擬多個10個客戶端 for (int i = 0; i < 10; i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); } } static class LockRunnable implements Runnable { @Override public void run() { AbstractLock zkLock = new SimpleZkLock(); //AbstractLock zkLock = new HighPerformanceZkLock(); zkLock.getLock(); //模擬業務操作 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } zkLock.releaseLock(); } } }
View Code

二、高性能分佈式鎖
上面使用zk實現的簡單的分佈式鎖,實現比較簡單,但是存在性能問題,從上面的打印的結果可以看出、每一次客戶端釋放鎖的時候,其他的客戶端都會去搶鎖,這就造成了不必要的浪費。那麼如果提升性能呢?
1、思路:客戶端在搶鎖的時候進行排隊,客戶端只要監聽它前一個節點的變化就行,如果前一個節點釋放了鎖,客戶端才去進行搶鎖操作,這個時候我們就需要創建順序節點了
2、圖解
(1)客戶端排隊
![]()
(2)獲取鎖的邏輯

HighPerformanceZkLock .java

package com.opendev.redssion.zookeeper; import org.I0Itec.zkclient.IZkDataListener; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 高性能分佈式鎖 * * @author hongtaolong */ public class HighPerformanceZkLock extends AbstractLock { private static final String PATH = "/highPerformance_zklock"; //當前節點路徑 private String currentPath; //前一個節點的路徑 private String beforePath; private CountDownLatch countDownLatch = null; public HighPerformanceZkLock() { //如果不存在這個節點,則創建持久節點 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override public void releaseLock() { if (null != zkClient) { zkClient.delete(currentPath); zkClient.close(); } } @Override public boolean tryLock() { //如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下創建一個臨時的順序節點 currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock"); } //獲取所有的臨時節點,並排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH + "/" + childrens.get(0))) { return true; } else {//如果當前節點不是排名第一,則獲取它前面的節點名稱,並賦值給beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength + 1)); beforePath = PATH + "/" + childrens.get(wz - 1); } return false; } @Override public void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { //節點刪除的回調機制 @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch) { countDownLatch.countDown(); } } //節點數據改變的回調機制 @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //監聽前一個節點的變化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } }
View Code


