【分散式鎖】07-Zookeeper實現分散式鎖:Semaphore、讀寫鎖實現原理

  • 2020 年 3 月 31 日
  • 筆記

前言

前面已經講解了Zookeeper可重入鎖的實現原理,自己對分散式鎖也有了更深的認知。

我在公眾號中發了一個疑問,相比於Redis來說,Zookeeper的實現方式要更好一些,即便Redis作者實現了RedLock演算法來解決Redis集群模式下分散式鎖的弊端,但Redis實現的分散式鎖仍然不是那麼完美。

比如有5台Redis集群,按照n/2 + 1代表獲取鎖成功,如果客戶端A此時獲取鎖,Redis集群(1,2,3)返回成功,客戶端A獲取鎖成功。

此時Redis 1 master宕機,切換到slave,而slave並未來得及同步客戶端A加鎖成功的資訊到slave。

客戶端B獲取鎖,Redis集群(1,4,5)返回成功,客戶端B仍然可以成功獲取鎖。

即使如此,為何在實際生產項目中分散式鎖大多還是由Redis來完成?

這一點我仍然有些疑惑,我接觸過的公司和項目都普遍用Redis來實現分散式鎖。

這裡就不再糾結了,接著繼續學習Zookeeper剩下幾個實現分散式鎖的組件吧。

Semaphore實現原理

前面已經講過Redisson中Semaphore的實現原理(【分散式鎖】05-使用Redisson中Semaphore和CountDownLatch原理),現在學習下ZK中Semaphore是如何實現的

Semaphore 使用案例

使用示例很簡單,Curator官網上有對應程式碼,使用InterProcessSemaphoreV2類即可,程式碼如下:

/**  *  Zookeeper分散式鎖測試程式碼  *   * @author wangmeng   * @date 2020/03/30 18:59   */  public class Application {        /** Zookeeper info */      private static final String ZK_ADDRESS = "YourZkIP:2181";      private static final String ZK_LOCK_PATH = "/locks/lock_01";      private static final String ZK_SEMAPHORE_LOCK_PATH = "/semaphore/semaphore_01";        public static void main(String[] args) throws InterruptedException {          // 1.Connect to zk          CuratorFramework client = CuratorFrameworkFactory.newClient(                  ZK_ADDRESS,                  new RetryNTimes(10, 5000)          );          client.start();          System.out.println("zk client start successfully!");            Thread t1 = new Thread(() -> {              testSemaphore(client);          }, "t1");          Thread t2 = new Thread(() -> {              testSemaphore(client);          }, "t2");          Thread t3 = new Thread(() -> {              testSemaphore(client);          }, "t3");            t1.start();          t2.start();          t3.start();      }          /**       * 測試Semaphore       */      private static void testSemaphore(CuratorFramework client) {          InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, ZK_SEMAPHORE_LOCK_PATH, 2);          try {              Lease lease = semaphore.acquire();              System.out.println(Thread.currentThread().getName() + " hold lock");              Thread.sleep(5000L);              semaphore.returnLease(lease);              System.out.println(Thread.currentThread().getName() + " release lock");          } catch (Exception e) {              e.printStackTrace();          }      }  }

列印結果為:

image.pngimage.png

因為設置的只允許最多2個客戶端同時獲取鎖。

從效果上看t3和t2同時獲取到了鎖,接著t3釋放了鎖後t1才獲取鎖。

Semaphore加鎖源碼解析

源碼面前出真知,我們直接看下源碼:

public class InterProcessSemaphoreV2 {        private static final String LOCK_PARENT = "locks";      private static final String LEASE_PARENT = "leases";      private static final String LEASE_BASE_NAME = "lease-";        public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception      {          long startMs = System.currentTimeMillis();          boolean hasWait = (unit != null);          long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;            Preconditions.checkArgument(qty > 0, "qty cannot be 0");            ImmutableList.Builder<Lease> builder = ImmutableList.builder();          boolean success = false;          try          {              while ( qty-- > 0 )              {                  int retryCount = 0;                  long startMillis = System.currentTimeMillis();                  boolean isDone = false;                  while ( !isDone )                  {                      switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )                      {                          case CONTINUE:                          {                              isDone = true;                              break;                          }                            // 省略其他分支邏輯                      }                  }              }              success = true;          }          finally          {              if ( !success )              {                  returnAll(builder.build());              }          }            return builder.build();      }        private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception      {          if ( client.getState() != CuratorFrameworkState.STARTED )          {              return InternalAcquireResult.RETURN_NULL;          }            if ( hasWait )          {              long thisWaitMs = getThisWaitMs(startMs, waitMs);              if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )              {                  return InternalAcquireResult.RETURN_NULL;              }          }          else          {              lock.acquire();          }          try          {              PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);              String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));              String nodeName = ZKPaths.getNodeFromPath(path);              builder.add(makeLease(path));                synchronized(this)              {                  for(;;)                  {                      List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);                      if ( !children.contains(nodeName) )                      {                          log.error("Sequential path not found: " + path);                          return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;                      }                        if ( children.size() <= maxLeases )                      {                          break;                      }                      if ( hasWait )                      {                          long thisWaitMs = getThisWaitMs(startMs, waitMs);                          if ( thisWaitMs <= 0 )                          {                              return InternalAcquireResult.RETURN_NULL;                          }                          wait(thisWaitMs);                      }                      else                      {                          wait();                      }                  }              }          }          finally          {              lock.release();          }          return InternalAcquireResult.CONTINUE;      }  }

程式碼有點長,我們一點點分析,我們以客戶端A、B、C同時進入獲取鎖邏輯來舉例,這裡Semaphore最大可允許2個客戶端同時獲取鎖。

  1. 三個客戶端同時進入switch邏輯,執行internalAcquire1Lease()方法
  2. internalAcquire1Lease()方法中,先使用lock.acquire()執行加鎖邏輯,這個lock是我們上一章講的可重入鎖邏輯,不再贅述
  3. 這個lock是哪裡初始化的呢?在InterProcessSemaphoreV2構造函數中:
    lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
    this.maxLeases = (count != null) ? count.getCount() : maxLeases;
  4. 注意lock的path為:/semaphore/semaphore_01/locks, maxLeases為傳入的3
  5. 此時客戶端A、B、C執行lock.acquire()只會有一個可以成功獲取鎖,其他兩個客戶端會wait()

到了這裡,Zookeeper中就會有三條類似於:
/semaphores/semaphore_01/locks/_c_a9302e20-de9c-4356-923a-274664d7676c-lock-0000000001 的數據

接著客戶端A繼續往下執行,具體邏輯如圖:

image.png

  1. 首先是客戶端A創建一個/locks/lock-xxxx01節點,獲取鎖成功過
  2. 接著創建臨時順序節點/leases/lease-xxxx01
  3. 判斷/leases目錄下節點數量(數量為1)是否小於等於maxLeases(maxLeases=2)
  4. 如果成功則退出循環,釋放/locks加的鎖,返回InternalAcquireResult.CONTINUE,狀態,執行lock.release()通知客戶端B、C爭搶/locks節點下的鎖
  5. 此時如果客戶端B搶到鎖,然後同樣創建/leases/lease-xxxx02,
  6. 判斷/leases目錄下節點數量(數量為2)是否小於等於maxLeases(maxLeases=2)
  7. 客戶端B也退出循環,返回InternalAcquireResult.CONTINUE,接著客戶端C來獲取鎖
  8. 客戶端C執行時,判斷/leases目錄下節點數量(數量為3)是否小於等於maxLeases(maxLeases=2)

此時客戶端C會進入到wait()方法,直到客戶端A或者客戶端B釋放leases節點下鎖時才會重試獲取鎖。

返回InternalAcquireResult.CONTINUE後,就標誌獲取鎖成功。

Semaphore釋放鎖源碼分析

我們直接看程式碼,釋放鎖程式碼很簡單:

/**   * Convenience method. Closes the lease   *   * @param lease lease to close   */  public void returnLease(Lease lease)  {      Closeables.closeQuietly(lease);  }

一路跟下去,可以看到closeQuietly實現方法:

image.pngimage.png image.png

最後用到Lease中的close()方法,刪除創建的/leases/lease-xxxx節點數據,然後通知其他節點客戶端,使用notifyAll()

ZK-Semaphore總結

一張圖總結下:

05_Zookeeper中Semaphore實現原理 _1_.jpg05_Zookeeper中Semaphore實現原理 _1_.jpg

Zookeeper 非重入鎖實現原理

之前聽小夥伴說過一個面試題,請說出你所知道的非重入鎖?

在腦子中搜索JDK中非重入鎖?好像沒有?

Zookeeper中提供了一個非重入鎖的實現方式,實現原理使用Semaphore,最大允許1個客戶端獲取鎖

按理說JDK中的Semaphore也可以實現此功能,哈哈,感覺自己被忽悠了,接著還是勉為其難的看下ZK中”非重入鎖”的實現方式吧:

使用示例

/**   * 測試非重入鎖   */  private static void testSemaphoreMutex(CuratorFramework client) {      InterProcessSemaphoreMutex semaphoreMutex = new InterProcessSemaphoreMutex(client, ZK_SEMAPHORE_LOCK_PATH);      try {          semaphoreMutex.acquire();          Thread.sleep(5000L);          semaphoreMutex.release();      } catch (Exception e) {          e.printStackTrace();      }  }

源碼分析

image.pngimage.png

實際上就是設置maxLeases為1,原理同上面的Semaphore源碼分析

Zookeeper讀寫鎖原理

之前在Redisson中已經見過它對讀寫鎖的實現,分別舉例了讀讀、寫寫、讀寫、寫讀這幾種場景鎖的互斥性以及可重入性,這裡也採用類似的場景分析。

讀寫鎖使用案例

直接看案例,可以針對案例修改幾種場景進行測試:

/**   * @author wangmeng   * @date 2020/03/30 18:59   */  public class Application {        /** Zookeeper info */      private static final String ZK_ADDRESS = "yourZkIP:2181";      private static final String ZK_LOCK_PATH = "/locks/lock_01";      private static final String ZK_SEMAPHORE_LOCK_PATH = "/semaphore/semaphore_01";      private static final String ZK_READ_WRITE_LOCK_PATH = "/readwrite/readwrite_01";        public static void main(String[] args) throws InterruptedException {          // 1.Connect to zk          CuratorFramework client = CuratorFrameworkFactory.newClient(                  ZK_ADDRESS,                  new RetryNTimes(10, 5000)          );          client.start();          System.out.println("zk client start successfully!");            Thread t1 = new Thread(() -> {              testReadWriteLock(client);          }, "t1");          Thread t2 = new Thread(() -> {              testReadWriteLock(client);          }, "t2");            t1.start();          t2.start();      }          /**       * 測試讀寫鎖       */      private static void testReadWriteLock(CuratorFramework client) {          InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, ZK_READ_WRITE_LOCK_PATH);          try {              // 獲取讀鎖              InterProcessMutex readLock = readWriteLock.readLock();              readLock.acquire();              System.out.println(Thread.currentThread().getName() + " hold read lock");              Thread.sleep(5000);              readLock.release();              System.out.println(Thread.currentThread().getName() + " release read lock");                // 獲取寫鎖              InterProcessMutex writeLock = readWriteLock.writeLock();              writeLock.acquire();              System.out.println(Thread.currentThread().getName() + " hold write lock");              Thread.sleep(5000);              writeLock.release();              System.out.println(Thread.currentThread().getName() + " release write lock");          } catch (Exception e) {              e.printStackTrace();          }      }  }

運行後結果:

image.pngimage.png

從結果可以看出來: 讀讀不互斥、 寫寫互斥

讀寫鎖源碼解析

首先看下InterProcessReadWrite的構造函數:

public class InterProcessReadWriteLock {      public InterProcessReadWriteLock(CuratorFramework client, String basePath)      {          writeMutex = new InternalInterProcessMutex          (              client,              basePath,              WRITE_LOCK_NAME,              1,              new SortingLockInternalsDriver()              {                  @Override                  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception                  {                      return super.getsTheLock(client, children, sequenceNodeName, maxLeases);                  }              }          );            readMutex = new InternalInterProcessMutex          (              client,              basePath,              READ_LOCK_NAME,              Integer.MAX_VALUE,              new SortingLockInternalsDriver()              {                  @Override                  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception                  {                      return readLockPredicate(children, sequenceNodeName);                  }              }          );      }  }

因為ZK中的讀寫鎖底層也是基於第一講中InterProcessMutex.internalLock()去實現的,所以InterProcessReadWriteLock讀鎖和寫鎖分別初始化了maxLeases及重寫了getsTheLock()方法,這個方法是判斷是否可以獲取鎖的核心程式碼,類似於:

int ourIndex = children.indexOf(sequenceNodeName);  boolean getsTheLock = ourIndex < maxLeases;

不清楚的可以回頭看看:【分散式鎖】06-Zookeeper實現分散式鎖:可重入鎖源碼分析

另外寫鎖和讀鎖的path會有區別:

private static final String READ_LOCK_NAME  = "__READ__";  private static final String WRITE_LOCK_NAME = "__WRIT__";

寫鎖的maxLeases是1,加了寫鎖就不允許再加其他讀鎖(但可重入加寫鎖和讀鎖)

讀鎖的maxLeases是Integer.MAX_VALUE,讀讀鎖不互斥

讀讀互斥及重入

查看讀鎖中判斷獲取鎖成功的核心邏輯:

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception  {      // 如果當前執行緒獲取寫鎖,那麼直接返回true,獲取讀鎖成功      if ( writeMutex.isOwnedByCurrentThread() )      {          return new PredicateResults(null, true);      }        int         index = 0;      int         firstWriteIndex = Integer.MAX_VALUE;      int         ourIndex = Integer.MAX_VALUE;      for ( String node : children )      {          if ( node.contains(WRITE_LOCK_NAME) )          {              firstWriteIndex = Math.min(index, firstWriteIndex);          }          else if ( node.startsWith(sequenceNodeName) )          {              ourIndex = index;              break;          }            ++index;      }      StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);        boolean     getsTheLock = (ourIndex < firstWriteIndex);      String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);      return new PredicateResults(pathToWatch, getsTheLock);  }

如果客戶端A已經獲取了讀鎖
此時客戶端B再來獲取讀鎖

  1. children:[xxx_READ_0001, xxxx_READ_0002],此時都是讀鎖,不包含WRITE鎖標識
  2. sequenceNodeName就是node創建的節點名稱,這裡ourIndex=index0
  3. ourIndex<firstWriteIndex = Integer.MAX_VALUE 獲取鎖成功

執行debug流程如下圖:

image.pngimage.png

因為讀讀不互斥,所以這裡讀鎖也是可重入的

寫讀互斥及重入

上面已經分析過讀讀的邏輯了,這裡接著按照上面的程式碼分析下讀寫的邏輯:

客戶端A加寫鎖成功
客戶端B加讀鎖

  1. node.contains(WRITE_LOCK_NAME),此時客戶端B中含有WRITE標識
  2. firstWriteIndex = Math.min(index, firstWriteIndex)=0
  3. boolean getsTheLock = (ourIndex < firstWriteIndex);
    此時ourIndex = Integer.MAX_VALUE,判斷條件不成立,所以加寫鎖失敗

不同客戶端寫讀鎖互斥
接著看看同一個客戶端邏輯:

if ( writeMutex.isOwnedByCurrentThread() )  {      return new PredicateResults(null, true);  }

如果當前執行緒獲取了寫鎖,那麼再加讀寫直接返回成功。

所以同一個客戶端同一執行緒:先加寫鎖、再加讀鎖可重入,這一點和Redisson中是一致的,具體可以看:【分散式鎖】04-使用Redisson實現ReadWriteLock原理

寫寫互斥及重入

寫鎖完全可以看做成InterProcessMutex,這裡maxLeases為1,所以同一個執行緒寫是可重入的,不同客戶端獲取鎖時互斥的

讀寫互斥及重入

客戶端A加讀鎖
客戶端B加寫鎖
同樣道理,此時children數據結構如:
[_c_13bf63d6-43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
_c_73b60882-9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]

判斷寫鎖在”/readwrite/readwrite_01″目錄下的位置,不是在首位,加鎖失敗

可重入鎖也是同樣原理,不可重入

Zookeeper中MultiLock實現原理

我們在Redisson中已經見過MultiLock原理,其中Redissoon為了實現RedLock演算法,也有MultiLock的實現(可以參考【分散式鎖】03-使用Redisson實現RedLock原理)當多個資源需要統一加鎖的時候,我們就可以使用MultiLock

Zookeeper中的MultiLock實現非常簡單,就是依次加鎖,實現如下圖:

image.pngimage.png

總結

Zookeeper實現分散式鎖的相關原理全都講完了,仔細閱讀Curator源碼覺得還挺有意思,再來會先Curator官網那句話:

Guava is to Java what Curator is to Zookeeper

Curator真的很強,分散式鎖實現的很棒!

申明

本文章首發自本人公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg