【分散式鎖】06-Zookeeper實現分散式鎖:可重入鎖源碼分析

  • 2020 年 3 月 30 日
  • 筆記

前言

前面已經講解了Redis的客戶端Redission是怎麼實現分散式鎖的,大多都深入到源碼級別。

在分散式系統中,常見的分散式鎖實現方案還有Zookeeper,接下來會深入研究Zookeeper是如何來實現分散式鎖的。

Zookeeper初識

文件系統

Zookeeper維護一個類似文件系統的數據結構

image.pngimage.png

每個子目錄項如NameService都被稱為znoed,和文件系統一樣,我們能夠自由的增加、刪除znode,在znode下增加、刪除子znode,唯一不同的在於znode是可以存儲數據的。

有4種類型的znode

  • PERSISTENT–持久化目錄節點客戶端與zookeeper斷開連接後,該節點依舊存在

  • PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點客戶端與zookeeper斷開連接後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號

  • EPHEMERAL-臨時目錄節點客戶端與zookeeper斷開連接後,該節點被刪除

  • EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點客戶端與zookeeper斷開連接後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號

通知機制

客戶端註冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、被刪除、子目錄節點增加刪除)等,zookeeper會通知客戶端。

分散式鎖

有了zookeeper的一致性文件系統,鎖的問題變得容易。鎖服務可以分為兩類,一個是保持獨佔,另一個是控制時序。

  1. 對於第一類,我們將zookeeper上的一個znode看作是一把鎖,通過create znode的方式來實現。所有客戶端都去創建 /distribute_lock 節點,最終成功創建的那個客戶端也即擁有了這把鎖。廁所有言:來也沖沖,去也沖沖,用完刪除掉自己創建的distribute_lock 節點就釋放出鎖。

  2. 對於第二類, /distribute_lock 已經預先存在,所有客戶端在它下面創建臨時順序編號目錄節點,和選master一樣,編號最小的獲得鎖,用完刪除自己創建的znode節點。

image.pngimage.png

註明:以上內容參考 https://www.cnblogs.com/dream-to-pku/p/9513188.html

Curator框架初識

Curator是Netflix公司開源的一套Zookeeper客戶端框架。目前已經作為Apache的頂級項目出現,是最流行的Zookeeper客戶端之一。

我們看下Apache Curator官網的介紹:

image.pngimage.png

接著看下quick start中關於分散式鎖相關的內容
地址為:http://curator.apache.org/getting-started.html

InterProcessMutex lock = new InterProcessMutex(client, lockPath);  if ( lock.acquire(maxWait, waitUnit) )  {      try      {          // do some work inside of the critical section here      }      finally      {          lock.release();      }  }

使用很簡單,使用InterProcessMutex類,使用其中的acquire()方法,就可以獲取一個分散式鎖了。

Curator分散式鎖使用示例

啟動兩個執行緒t1和t2去爭奪鎖,拿到鎖的執行緒會佔用5秒。運行多次可以觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。Curator會用我們提供的lock路徑的結點作為全局鎖,這個結點的數據類似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-00000000001],每次獲得鎖時會生成這種串,釋放鎖時清空數據。

接下來看看加鎖的示例:

public class Application {      private static final String ZK_ADDRESS = "192.20.38.58:2181";      private static final String ZK_LOCK_PATH = "/locks/lock_01";        public static void main(String[] args) throws InterruptedException {          CuratorFramework client = CuratorFrameworkFactory.newClient(                  ZK_ADDRESS,                  new RetryNTimes(10, 5000)          );          client.start();          System.out.println("zk client start successfully!");            Thread t1 = new Thread(() -> {              doWithLock(client);          }, "t1");          Thread t2 = new Thread(() -> {              doWithLock(client);          }, "t2");            t1.start();          t2.start();      }        private static void doWithLock(CuratorFramework client) {          InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);          try {              if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {                  System.out.println(Thread.currentThread().getName() + " hold lock");                  Thread.sleep(5000L);                  System.out.println(Thread.currentThread().getName() + " release lock");              }          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  lock.release();              } catch (Exception e) {                  e.printStackTrace();              }          }      }  }

運行結果:

image.pngimage.png

Curator 加鎖實現原理

直接看Curator加鎖的程式碼:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {        private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();         private static class LockData      {          final Thread        owningThread;          final String        lockPath;          final AtomicInteger lockCount = new AtomicInteger(1);            private LockData(Thread owningThread, String lockPath)          {              this.owningThread = owningThread;              this.lockPath = lockPath;          }      }        @Override      public boolean acquire(long time, TimeUnit unit) throws Exception      {          return internalLock(time, unit);      }           private boolean internalLock(long time, TimeUnit unit) throws Exception      {          /*             Note on concurrency: a given lockData instance             can be only acted on by a single thread so locking isn't necessary          */            Thread          currentThread = Thread.currentThread();            LockData        lockData = threadData.get(currentThread);          if ( lockData != null )          {              // re-entering              lockData.lockCount.incrementAndGet();              return true;          }            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());          if ( lockPath != null )          {              LockData        newLockData = new LockData(currentThread, lockPath);              threadData.put(currentThread, newLockData);              return true;          }            return false;      }  }

直接看internalLock()方法,首先是獲取當前執行緒,然後查看當前執行緒是否在一個concurrentHashMap中,這裡是重入鎖的實現,如果當前已經已經獲取了鎖,那麼這個執行緒獲取鎖的次數再+1

如果沒有獲取鎖,那麼就是用attemptLock()方法去嘗試獲取鎖,如果lockPath不為空,說明獲取鎖成功,並將當前執行緒放入到map中。

接下來看看核心的加鎖邏輯attemptLock()方法:

入參:
time : 獲取鎖等待的時間
unit:時間單位
lockNodeBytes:默認為null

public class LockInternals {      String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception      {          final long      startMillis = System.currentTimeMillis();          final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;          final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;          int             retryCount = 0;            String          ourPath = null;          boolean         hasTheLock = false;          boolean         isDone = false;          while ( !isDone )          {              isDone = true;                try              {                  if ( localLockNodeBytes != null )                  {                      ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);                  }                  else                  {                      ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);                  }                  hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);              }              catch ( KeeperException.NoNodeException e )              {                  // gets thrown by StandardLockInternalsDriver when it can't find the lock node                  // this can happen when the session expires, etc. So, if the retry allows, just try it all again                  if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )                  {                      isDone = false;                  }                  else                  {                      throw e;                  }              }          }            if ( hasTheLock )          {              return ourPath;          }            return null;      }  }

ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

使用的臨時順序節點,首先他是臨時節點,如果當前這台機器如果自己宕機的話,他創建的這個臨時節點就會自動消失,如果有獲取鎖的客戶端宕機了,zk可以保證鎖會自動釋放的

創建的數據結構類似於:

 

客戶端A獲取鎖的程式碼,生成的ourPath=xxxx01

image.png

 

客戶端B獲取鎖的程式碼,生成的ourPath=xxxx02image.png

 

查看Zookeeper中/locks/lock_01下所有臨時節點數據:

image.pngPS:01/02的圖沒有截到,這裡又跑了一次截圖所示 03/04 的順序節點在ZK中的顯示

接著重點看interalLockLoop()的邏輯:

public class LockInternals {      private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception      {          boolean     haveTheLock = false;          boolean     doDelete = false;          try          {              if ( revocable.get() != null )              {                  client.getData().usingWatcher(revocableWatcher).forPath(ourPath);              }                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )              {                  List<String>        children = getSortedChildren();                  String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);                  if ( predicateResults.getsTheLock() )                  {                      haveTheLock = true;                  }                  else                  {                      String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();                        synchronized(this)                      {                          Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);                          if ( stat != null )                          {                              if ( millisToWait != null )                              {                                  millisToWait -= (System.currentTimeMillis() - startMillis);                                  startMillis = System.currentTimeMillis();                                  if ( millisToWait <= 0 )                                  {                                      doDelete = true;    // timed out - delete our node                                      break;                                  }                                    wait(millisToWait);                              }                              else                              {                                  wait();                              }                          }                      }                      // else it may have been deleted (i.e. lock released). Try to acquire again                  }              }          }               // 省略部分程式碼          return haveTheLock;      }  }

重點看while循環中的邏輯
首先是獲取鎖的邏輯:

  1. 獲取/locks/lock_01下排好序的znode節點,上面看圖已經知道,會有xxx01xxx02兩個節點
  2. 調用getsTheLock()方法獲取鎖,其中maxLeases為1,默認只能一個執行緒獲取鎖
  3. 定位到StandardLockInternalsDriver.getsTheLock()方法,其中程式碼核心如下:
    int ourIndex = children.indexOf(sequenceNodeName);
    boolean getsTheLock = ourIndex &lt; maxLeases;
  4. 上面sequenceNodeName參數為xxx01的全路徑名,然後查看在排好序的children列表中是否為第一個元素,如果是第一個元素,返回的ourIndex=0,此時則認為獲取鎖成功
  5. 如果為有序列表中的第一個元素,那麼predicateResults.getsTheLock() 為true,獲取鎖的標誌位havaTheLock為true,直接返回獲取鎖成功

然後是獲取鎖失敗的邏輯:
獲取鎖失敗的核心程式碼:

String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();    synchronized(this)  {      Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);      if ( stat != null )      {          if ( millisToWait != null )          {              millisToWait -= (System.currentTimeMillis() - startMillis);              startMillis = System.currentTimeMillis();              if ( millisToWait <= 0 )              {                  doDelete = true;    // timed out - delete our node                  break;              }                wait(millisToWait);          }          else          {              wait();          }      }  }

  1. 針對上一個節點添加監聽器
  2. 如果加鎖有過期時間,到了過期時間後直接break退出循環
  3. 當前執行緒處於wait()狀態,等待上一個執行緒釋放鎖

Curator 釋放鎖實現原理

釋放鎖其實很簡單,直接刪除當前臨時節點,因為下一個節點監聽了上一個節點資訊,所以上一個節點刪除後,當前節點就會被喚醒重新獲取鎖。

private void deleteOurPath(String ourPath) throws Exception  {      try      {          client.delete().guaranteed().forPath(ourPath);      }      catch ( KeeperException.NoNodeException e )      {          // ignore - already deleted (possibly expired session, etc.)      }  }

總結

一張圖總結:

04_Zookeeper分散式鎖實現原理.jpg

原圖可查看我的分享:
https://www.processon.com/view/link/5e80508de4b06b85300175d2