【分散式鎖】07-Zookeeper實現分散式鎖:Semaphore、讀寫鎖實現原理
- 2020 年 3 月 31 日
- 筆記
前言
前面已經講解了Zookeeper可重入鎖的實現原理,自己對分散式鎖也有了更深的認知。
我在公眾號中發了一個疑問,相比於Redis來說,Zookeeper的實現方式要更好一些,即便Redis作者實現了RedLock演算法來解決Redis集群模式下分散式鎖的弊端,但Redis實現的分散式鎖仍然不是那麼完美。
比如有5台Redis集群,按照n/2 + 1代表獲取鎖成功,如果客戶端A此時獲取鎖,Redis集群(1,2,3)返回成功,客戶端A獲取鎖成功。
此時Redis 1 master宕機,切換到slave,而slave並未來得及同步客戶端A加鎖成功的資訊到slave。
客戶端B獲取鎖,Redis集群(1,4,5)返回成功,客戶端B仍然可以成功獲取鎖。
即使如此,為何在實際生產項目中分散式鎖大多還是由Redis來完成?
這一點我仍然有些疑惑,我接觸過的公司和項目都普遍用Redis來實現分散式鎖。
這裡就不再糾結了,接著繼續學習Zookeeper剩下幾個實現分散式鎖的組件吧。
Semaphore實現原理
前面已經講過Redisson中Semaphore的實現原理(【分散式鎖】05-使用Redisson中Semaphore和CountDownLatch原理),現在學習下ZK中Semaphore是如何實現的
Semaphore 使用案例
使用示例很簡單,Curator官網上有對應程式碼,使用InterProcessSemaphoreV2
類即可,程式碼如下:
/** * Zookeeper分散式鎖測試程式碼 * * @author wangmeng * @date 2020/03/30 18:59 */ public class Application { /** Zookeeper info */ private static final String ZK_ADDRESS = "YourZkIP:2181"; private static final String ZK_LOCK_PATH = "/locks/lock_01"; private static final String ZK_SEMAPHORE_LOCK_PATH = "/semaphore/semaphore_01"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { testSemaphore(client); }, "t1"); Thread t2 = new Thread(() -> { testSemaphore(client); }, "t2"); Thread t3 = new Thread(() -> { testSemaphore(client); }, "t3"); t1.start(); t2.start(); t3.start(); } /** * 測試Semaphore */ private static void testSemaphore(CuratorFramework client) { InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, ZK_SEMAPHORE_LOCK_PATH, 2); try { Lease lease = semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); semaphore.returnLease(lease); System.out.println(Thread.currentThread().getName() + " release lock"); } catch (Exception e) { e.printStackTrace(); } } }
列印結果為:
image.png
因為設置的只允許最多2個客戶端同時獲取鎖。
從效果上看t3和t2同時獲取到了鎖,接著t3釋放了鎖後t1才獲取鎖。
Semaphore加鎖源碼解析
源碼面前出真知,我們直接看下源碼:
public class InterProcessSemaphoreV2 { private static final String LOCK_PARENT = "locks"; private static final String LEASE_PARENT = "leases"; private static final String LEASE_BASE_NAME = "lease-"; public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasWait = (unit != null); long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0; Preconditions.checkArgument(qty > 0, "qty cannot be 0"); ImmutableList.Builder<Lease> builder = ImmutableList.builder(); boolean success = false; try { while ( qty-- > 0 ) { int retryCount = 0; long startMillis = System.currentTimeMillis(); boolean isDone = false; while ( !isDone ) { switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) ) { case CONTINUE: { isDone = true; break; } // 省略其他分支邏輯 } } } success = true; } finally { if ( !success ) { returnAll(builder.build()); } } return builder.build(); } private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception { if ( client.getState() != CuratorFrameworkState.STARTED ) { return InternalAcquireResult.RETURN_NULL; } if ( hasWait ) { long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) ) { return InternalAcquireResult.RETURN_NULL; } } else { lock.acquire(); } try { PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL); String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME)); String nodeName = ZKPaths.getNodeFromPath(path); builder.add(makeLease(path)); synchronized(this) { for(;;) { List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath); if ( !children.contains(nodeName) ) { log.error("Sequential path not found: " + path); return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE; } if ( children.size() <= maxLeases ) { break; } if ( hasWait ) { long thisWaitMs = getThisWaitMs(startMs, waitMs); if ( thisWaitMs <= 0 ) { return InternalAcquireResult.RETURN_NULL; } wait(thisWaitMs); } else { wait(); } } } } finally { lock.release(); } return InternalAcquireResult.CONTINUE; } }
程式碼有點長,我們一點點分析,我們以客戶端A、B、C同時進入獲取鎖邏輯來舉例,這裡Semaphore最大可允許2個客戶端同時獲取鎖。
- 三個客戶端同時進入switch邏輯,執行
internalAcquire1Lease()
方法 - 在
internalAcquire1Lease()
方法中,先使用lock.acquire()
執行加鎖邏輯,這個lock是我們上一章講的可重入鎖邏輯,不再贅述 - 這個
lock
是哪裡初始化的呢?在InterProcessSemaphoreV2
構造函數中:lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
- 注意lock的path為:
/semaphore/semaphore_01/locks
, maxLeases為傳入的3 - 此時客戶端A、B、C執行
lock.acquire()
只會有一個可以成功獲取鎖,其他兩個客戶端會wait()
到了這裡,Zookeeper中就會有三條類似於:/semaphores/semaphore_01/locks/_c_a9302e20-de9c-4356-923a-274664d7676c-lock-0000000001
的數據
接著客戶端A繼續往下執行,具體邏輯如圖:
- 首先是客戶端A創建一個/locks/lock-xxxx01節點,獲取鎖成功過
- 接著創建臨時順序節點/leases/lease-xxxx01
- 判斷/leases目錄下節點數量(數量為1)是否小於等於maxLeases(maxLeases=2)
- 如果成功則退出循環,釋放/locks加的鎖,返回
InternalAcquireResult.CONTINUE
,狀態,執行lock.release()
通知客戶端B、C爭搶/locks
節點下的鎖 - 此時如果客戶端B搶到鎖,然後同樣創建/leases/lease-xxxx02,
- 判斷/leases目錄下節點數量(數量為2)是否小於等於maxLeases(maxLeases=2)
- 客戶端B也退出循環,返回
InternalAcquireResult.CONTINUE
,接著客戶端C來獲取鎖 - 客戶端C執行時,判斷/leases目錄下節點數量(數量為3)是否小於等於maxLeases(maxLeases=2)
此時客戶端C會進入到wait()
方法,直到客戶端A或者客戶端B釋放leases
節點下鎖時才會重試獲取鎖。
返回InternalAcquireResult.CONTINUE
後,就標誌獲取鎖成功。
Semaphore釋放鎖源碼分析
我們直接看程式碼,釋放鎖程式碼很簡單:
/** * Convenience method. Closes the lease * * @param lease lease to close */ public void returnLease(Lease lease) { Closeables.closeQuietly(lease); }
一路跟下去,可以看到closeQuietly
實現方法:
image.png
最後用到Lease
中的close()
方法,刪除創建的/leases/lease-xxxx
節點數據,然後通知其他節點客戶端,使用notifyAll()
ZK-Semaphore總結
一張圖總結下:
05_Zookeeper中Semaphore實現原理 _1_.jpg
Zookeeper 非重入鎖實現原理
之前聽小夥伴說過一個面試題,請說出你所知道的非重入鎖?
在腦子中搜索JDK中非重入鎖?好像沒有?
Zookeeper中提供了一個非重入鎖的實現方式,實現原理使用Semaphore,最大允許1個客戶端獲取鎖
按理說JDK中的Semaphore也可以實現此功能,哈哈,感覺自己被忽悠了,接著還是勉為其難的看下ZK中”非重入鎖”的實現方式吧:
使用示例
/** * 測試非重入鎖 */ private static void testSemaphoreMutex(CuratorFramework client) { InterProcessSemaphoreMutex semaphoreMutex = new InterProcessSemaphoreMutex(client, ZK_SEMAPHORE_LOCK_PATH); try { semaphoreMutex.acquire(); Thread.sleep(5000L); semaphoreMutex.release(); } catch (Exception e) { e.printStackTrace(); } }
源碼分析
image.png
實際上就是設置maxLeases
為1,原理同上面的Semaphore
源碼分析
Zookeeper讀寫鎖原理
之前在Redisson中已經見過它對讀寫鎖的實現,分別舉例了讀讀、寫寫、讀寫、寫讀這幾種場景鎖的互斥性以及可重入性,這裡也採用類似的場景分析。
讀寫鎖使用案例
直接看案例,可以針對案例修改幾種場景進行測試:
/** * @author wangmeng * @date 2020/03/30 18:59 */ public class Application { /** Zookeeper info */ private static final String ZK_ADDRESS = "yourZkIP:2181"; private static final String ZK_LOCK_PATH = "/locks/lock_01"; private static final String ZK_SEMAPHORE_LOCK_PATH = "/semaphore/semaphore_01"; private static final String ZK_READ_WRITE_LOCK_PATH = "/readwrite/readwrite_01"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { testReadWriteLock(client); }, "t1"); Thread t2 = new Thread(() -> { testReadWriteLock(client); }, "t2"); t1.start(); t2.start(); } /** * 測試讀寫鎖 */ private static void testReadWriteLock(CuratorFramework client) { InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, ZK_READ_WRITE_LOCK_PATH); try { // 獲取讀鎖 InterProcessMutex readLock = readWriteLock.readLock(); readLock.acquire(); System.out.println(Thread.currentThread().getName() + " hold read lock"); Thread.sleep(5000); readLock.release(); System.out.println(Thread.currentThread().getName() + " release read lock"); // 獲取寫鎖 InterProcessMutex writeLock = readWriteLock.writeLock(); writeLock.acquire(); System.out.println(Thread.currentThread().getName() + " hold write lock"); Thread.sleep(5000); writeLock.release(); System.out.println(Thread.currentThread().getName() + " release write lock"); } catch (Exception e) { e.printStackTrace(); } } }
運行後結果:
image.png
從結果可以看出來: 讀讀不互斥、 寫寫互斥
讀寫鎖源碼解析
首先看下InterProcessReadWrite
的構造函數:
public class InterProcessReadWriteLock { public InterProcessReadWriteLock(CuratorFramework client, String basePath) { writeMutex = new InternalInterProcessMutex ( client, basePath, WRITE_LOCK_NAME, 1, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return super.getsTheLock(client, children, sequenceNodeName, maxLeases); } } ); readMutex = new InternalInterProcessMutex ( client, basePath, READ_LOCK_NAME, Integer.MAX_VALUE, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return readLockPredicate(children, sequenceNodeName); } } ); } }
因為ZK中的讀寫鎖底層也是基於第一講中InterProcessMutex.internalLock()
去實現的,所以InterProcessReadWriteLock
讀鎖和寫鎖分別初始化了maxLeases
及重寫了getsTheLock()
方法,這個方法是判斷是否可以獲取鎖的核心程式碼,類似於:
int ourIndex = children.indexOf(sequenceNodeName); boolean getsTheLock = ourIndex < maxLeases;
不清楚的可以回頭看看:【分散式鎖】06-Zookeeper實現分散式鎖:可重入鎖源碼分析
另外寫鎖和讀鎖的path會有區別:
private static final String READ_LOCK_NAME = "__READ__"; private static final String WRITE_LOCK_NAME = "__WRIT__";
寫鎖的maxLeases是1,加了寫鎖就不允許再加其他讀鎖(但可重入加寫鎖和讀鎖)
讀鎖的maxLeases是Integer.MAX_VALUE,讀讀鎖不互斥
讀讀互斥及重入
查看讀鎖中判斷獲取鎖成功的核心邏輯:
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception { // 如果當前執行緒獲取寫鎖,那麼直接返回true,獲取讀鎖成功 if ( writeMutex.isOwnedByCurrentThread() ) { return new PredicateResults(null, true); } int index = 0; int firstWriteIndex = Integer.MAX_VALUE; int ourIndex = Integer.MAX_VALUE; for ( String node : children ) { if ( node.contains(WRITE_LOCK_NAME) ) { firstWriteIndex = Math.min(index, firstWriteIndex); } else if ( node.startsWith(sequenceNodeName) ) { ourIndex = index; break; } ++index; } StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = (ourIndex < firstWriteIndex); String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex); return new PredicateResults(pathToWatch, getsTheLock); }
如果客戶端A已經獲取了讀鎖
此時客戶端B再來獲取讀鎖
- children:[xxx_READ_0001, xxxx_READ_0002],此時都是讀鎖,不包含WRITE鎖標識
- sequenceNodeName就是node創建的節點名稱,這裡ourIndex=index0
- ourIndex<firstWriteIndex = Integer.MAX_VALUE 獲取鎖成功
執行debug流程如下圖:
image.png
因為讀讀不互斥,所以這裡讀鎖也是可重入的
寫讀互斥及重入
上面已經分析過讀讀的邏輯了,這裡接著按照上面的程式碼分析下讀寫的邏輯:
客戶端A加寫鎖成功
客戶端B加讀鎖
node.contains(WRITE_LOCK_NAME)
,此時客戶端B中含有WRITE標識- firstWriteIndex = Math.min(index, firstWriteIndex)=0
- boolean getsTheLock = (ourIndex < firstWriteIndex);
此時ourIndex = Integer.MAX_VALUE,判斷條件不成立,所以加寫鎖失敗
不同客戶端寫讀鎖互斥
接著看看同一個客戶端邏輯:
if ( writeMutex.isOwnedByCurrentThread() ) { return new PredicateResults(null, true); }
如果當前執行緒獲取了寫鎖,那麼再加讀寫直接返回成功。
所以同一個客戶端同一執行緒:先加寫鎖、再加讀鎖可重入,這一點和Redisson中是一致的,具體可以看:【分散式鎖】04-使用Redisson實現ReadWriteLock原理
寫寫互斥及重入
寫鎖完全可以看做成InterProcessMutex
,這裡maxLeases
為1,所以同一個執行緒寫是可重入的,不同客戶端獲取鎖時互斥的
讀寫互斥及重入
客戶端A加讀鎖
客戶端B加寫鎖
同樣道理,此時children數據結構如:
[_c_13bf63d6-43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
_c_73b60882-9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]
判斷寫鎖在”/readwrite/readwrite_01″目錄下的位置,不是在首位,加鎖失敗
可重入鎖也是同樣原理,不可重入
Zookeeper中MultiLock實現原理
我們在Redisson中已經見過MultiLock原理,其中Redissoon為了實現RedLock
演算法,也有MultiLock的實現(可以參考【分散式鎖】03-使用Redisson實現RedLock原理)當多個資源需要統一加鎖的時候,我們就可以使用MultiLock
Zookeeper中的MultiLock實現非常簡單,就是依次加鎖,實現如下圖:
image.png
總結
Zookeeper實現分散式鎖的相關原理全都講完了,仔細閱讀Curator源碼覺得還挺有意思,再來會先Curator官網那句話:
Guava is to Java what Curator is to Zookeeper
Curator真的很強,分散式鎖實現的很棒!
申明
本文章首發自本人公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫