求你了,別再問我Zookeeper如何實現分散式鎖了!!!

導讀

  • 真是有人()的地方就有江湖(事務),今天不談江湖,來撩撩人。

  • 分散式鎖的概念、為什麼使用分散式鎖,想必大家已經很清楚了。前段時間作者寫過Redis是如何實現分散式鎖,今天這篇文章來談談Zookeeper是如何實現分散式鎖的。

  • 陳某今天分別從如下幾個方面來詳細講講ZK如何實現分散式鎖:

    1. ZK的四種節點

    2. 排它鎖的實現

    3. 讀寫鎖的實現

    4. Curator實現分步式鎖

ZK的四種節點

  • 持久性節點:節點創建後將會一直存在

  • 臨時節點:臨時節點的生命周期和當前會話綁定,一旦當前會話斷開臨時節點也會刪除,當然可以主動刪除。

  • 持久有序節點:節點創建一直存在,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。

  • 臨時有序節點:保留臨時節點的特性,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。

 

排它鎖的實現

  • 排他鎖的實現相對簡單一點,利用了zk的創建節點不能重名的特性。如下圖:

  • 根據上圖分析大致分為如下步驟:

    1. 嘗試獲取鎖:創建臨時節點,zk會保證只有一個客戶端創建成功。

    2. 創建臨時節點成功,獲取鎖成功,執行業務邏輯,業務執行完成後刪除鎖。

    3. 創建臨時節點失敗,阻塞等待。

    4. 監聽刪除事件,一旦臨時節點刪除了,表示互斥操作完成了,可以再次嘗試獲取鎖。

    5. 遞歸:獲取鎖的過程是一個遞歸的操作,獲取鎖->監聽->獲取鎖

  • 如何避免死鎖:創建的是臨時節點,當服務宕機會話關閉後臨時節點將會被刪除,鎖自動釋放。

程式碼實現

  • 作者參照JDK鎖的實現方式加上模板方法模式的封裝,封裝介面如下:

/**   * @Description ZK分散式鎖的介面   * @Author 陳某   * @Date 2020/4/7 22:52   */  public interface ZKLock {      /**       * 獲取鎖       */      void lock() throws Exception;  ​      /**       * 解鎖       */      void unlock() throws Exception;  }

 

  • 模板抽象類如下:

/**   * @Description 排他鎖,模板類   * @Author 陳某   * @Date 2020/4/7 22:55   */  public abstract class AbstractZKLockMutex implements ZKLock {  ​      /**       * 節點路徑       */      protected String lockPath;  ​      /**       * zk客戶端       */      protected CuratorFramework zkClient;  ​      private AbstractZKLockMutex(){}  ​      public AbstractZKLockMutex(String lockPath,CuratorFramework client){          this.lockPath=lockPath;          this.zkClient=client;      }  ​      /**       * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現       * @throws Exception       */      @Override      public final void lock() throws Exception {          //獲取鎖成功          if (tryLock()){              System.out.println(Thread.currentThread().getName()+"獲取鎖成功");          }else{  //獲取鎖失敗              //阻塞一直等待              waitLock();              //遞歸,再次獲取鎖              lock();          }      }  ​      /**       * 嘗試獲取鎖,子類實現       */      protected abstract boolean tryLock() ;  ​  ​      /**       * 等待獲取鎖,子類實現       */      protected abstract void waitLock() throws Exception;  ​  ​      /**       * 解鎖:刪除節點或者直接斷開連接       */      @Override      public  abstract void unlock() throws Exception;  }

 

  • 排他鎖的具體實現類如下:

/**   * @Description 排他鎖的實現類,繼承模板類 AbstractZKLockMutex   * @Author 陳某   * @Date 2020/4/7 23:23   */  @Data  public class ZKLockMutex extends AbstractZKLockMutex {  ​      /**       * 用於實現執行緒阻塞       */      private CountDownLatch countDownLatch;  ​      public ZKLockMutex(String lockPath,CuratorFramework zkClient){          super(lockPath,zkClient);      }  ​      /**       * 嘗試獲取鎖:直接創建一個臨時節點,如果這個節點存在創建失敗拋出異常,表示已經互斥了,       * 反之創建成功       * @throws Exception       */      @Override      protected boolean tryLock()  {          try {              zkClient.create()                      //臨時節點                      .withMode(CreateMode.EPHEMERAL)                      //許可權列表 world:anyone:crdwa                      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)                      .forPath(lockPath,"lock".getBytes());              return true;          }catch (Exception ex){              return false;          }      }  ​  ​      /**       * 等待鎖,一直阻塞監聽       * @return  成功獲取鎖返回true,反之返回false       */      @Override      protected void waitLock() throws Exception {          //監聽節點的新增、更新、刪除          final NodeCache nodeCache = new NodeCache(zkClient, lockPath);          //啟動監聽          nodeCache.start();          ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();  ​          //監聽器          NodeCacheListener listener=()-> {              //節點被刪除,此時獲取鎖              if (nodeCache.getCurrentData() == null) {                  //countDownLatch不為null,表示節點存在,此時監聽到節點刪除了,因此-1                  if (countDownLatch != null)                      countDownLatch.countDown();              }          };          //添加監聽器          listenable.addListener(listener);  ​          //判斷節點是否存在          Stat stat = zkClient.checkExists().forPath(lockPath);          //節點存在          if (stat!=null){              countDownLatch=new CountDownLatch(1);              //阻塞主執行緒,監聽              countDownLatch.await();          }          //移除監聽器          listenable.removeListener(listener);      }  ​      /**       * 解鎖,直接刪除節點       * @throws Exception       */      @Override      public void unlock() throws Exception {          zkClient.delete().forPath(lockPath);      }  }

 

可重入性排他鎖如何設計

  • 可重入的邏輯很簡單,在本地保存一個ConcurrentMapkey是當前執行緒,value是定義的數據,結構如下:

 private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

 

  • 重入的偽程式碼如下:

public boolean tryLock(){      //判斷當前執行緒是否在threadData保存過      //存在,直接return true      //不存在執行獲取鎖的邏輯      //獲取成功保存在threadData中  }

 

 

讀寫鎖的實現

  • 讀寫鎖分為讀鎖和寫鎖,區別如下:

    • 讀鎖允許多個執行緒同時讀數據,但是在讀的同時不允許寫執行緒修改。

    • 寫鎖在獲取後,不允許多個執行緒同時寫或者讀。

  • 如何實現讀寫鎖?ZK中有一類節點叫臨時有序節點,上文有介紹。下面我們來利用臨時有序節點來實現讀寫鎖的功能。

 

讀鎖的設計

  • 讀鎖允許多個執行緒同時進行讀,並且在讀的同時不允許執行緒進行寫操作,實現原理如下圖:

  • 根據上圖,獲取一個讀鎖分為以下步驟:

    1. 創建臨時有序節點(當前執行緒擁有的讀鎖或稱作讀節點)。

    2. 獲取路徑下所有的子節點,並進行從小到大排序

    3. 獲取當前節點前的臨近寫節點(寫鎖)。

    4. 如果不存在的臨近寫節點,則成功獲取讀鎖。

    5. 如果存在臨近寫節點,對其監聽刪除事件。

    6. 一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)

 

寫鎖的設計

  • 執行緒一旦獲取了寫鎖,不允許其他執行緒讀和寫。實現原理如下:

 

  • 從上圖可以看出唯一和寫鎖不同的就是監聽的節點,這裡是監聽臨近節點(讀節點或者寫節點),讀鎖只需要監聽寫節點,步驟如下:

    1. 創建臨時有序節點(當前執行緒擁有的寫鎖或稱作寫節點)。

    2. 獲取路徑下的所有子節點,並進行從小到大排序。

    3. 獲取當前節點的臨近節點(讀節點和寫節點)。

    4. 如果不存在臨近節點,則成功獲取鎖。

    5. 如果存在臨近節點,對其進行監聽刪除事件。

    6. 一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)

 

如何監聽

  • 無論是寫鎖還是讀鎖都需要監聽前面的節點,不同的是讀鎖只監聽臨近的寫節點,寫鎖是監聽臨近的所有節點,抽象出來看其實是一種鏈式的監聽,如下圖:

  • 每一個節點都在監聽前面的臨近節點,一旦前面一個節點刪除了,再從新排序後監聽前面的節點,這樣遞歸下去。

 

程式碼實現

  • 作者簡單的寫了讀寫鎖的實現,先造出來再優化,不建議用在生產環境。程式碼如下:

public class ZKLockRW  {  ​      /**       * 節點路徑       */      protected String lockPath;  ​      /**       * zk客戶端       */      protected CuratorFramework zkClient;  ​      /**       * 用於阻塞執行緒       */      private CountDownLatch countDownLatch=new CountDownLatch(1);  ​  ​      private final static String WRITE_NAME="_W_LOCK";  ​      private final static String READ_NAME="_R_LOCK";  ​  ​      public ZKLockRW(String lockPath, CuratorFramework client) {          this.lockPath=lockPath;          this.zkClient=client;      }  ​      /**       * 獲取鎖,如果獲取失敗一直阻塞       * @throws Exception       */      public void lock() throws Exception {          //創建節點          String node = createNode();          //阻塞等待獲取鎖          tryLock(node);          countDownLatch.await();      }  ​      /**       * 創建臨時有序節點       * @return       * @throws Exception       */      private String createNode() throws Exception {          //創建臨時有序節點         return zkClient.create()                  .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)                  .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)                  .forPath(lockPath);      }  ​      /**       * 獲取寫鎖       * @return       */      public  ZKLockRW writeLock(){          return new ZKLockRW(lockPath+WRITE_NAME,zkClient);      }  ​      /**       * 獲取讀鎖       * @return       */      public  ZKLockRW readLock(){          return new ZKLockRW(lockPath+READ_NAME,zkClient);      }  ​      private void tryLock(String nodePath) throws Exception {          //獲取所有的子節點          List<String> childPaths = zkClient.getChildren()                  .forPath("/")                  .stream().sorted().map(o->"/"+o).collect(Collectors.toList());  ​  ​          //第一個節點就是當前的鎖,直接獲取鎖。遞歸結束的條件          if (nodePath.equals(childPaths.get(0))){              countDownLatch.countDown();              return;          }  ​          //1. 讀鎖:監聽最前面的寫鎖,寫鎖釋放了,自然能夠讀了          if (nodePath.contains(READ_NAME)){              //查找臨近的寫鎖              String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath));              if (preNode==null){                  countDownLatch.countDown();                  return;              }              NodeCache nodeCache=new NodeCache(zkClient,preNode);              nodeCache.start();              ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();              listenable.addListener(() -> {                  //節點刪除事件                  if (nodeCache.getCurrentData()==null){                      //繼續監聽前一個節點                      String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode));                      if (nearWriteNode==null){                          countDownLatch.countDown();                          return;                      }                      tryLock(nearWriteNode);                  }              });          }  ​          //如果是寫鎖,前面無論是什麼鎖都不能讀,直接循環監聽上一個節點即可,直到前面無鎖          if (nodePath.contains(WRITE_NAME)){              String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1);              NodeCache nodeCache=new NodeCache(zkClient,preNode);              nodeCache.start();              ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();              listenable.addListener(() -> {                  //節點刪除事件                  if (nodeCache.getCurrentData()==null){                      //繼續監聽前一個節點                      tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1));                  }              });          }      }  ​      /**       * 查找臨近的寫節點       * @param childPath 全部的子節點       * @param index 右邊界       * @return       */      private String  getNearWriteNode(List<String> childPath,Integer index){          for (int i = 0; i < index; i++) {              String node = childPath.get(i);              if (node.contains(WRITE_NAME))                  return node;  ​          }          return null;      }  ​  }

 

Curator實現分步式鎖

  • Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。

  • Curator在分散式鎖方面已經為我們封裝好了,大致實現的思路就是按照作者上述的思路實現的。中小型互聯網公司還是建議直接使用框架封裝好的,畢竟穩定,有些大型的互聯公司都是手寫的,牛逼啊。

  • 創建一個排他鎖很簡單,如下:

//arg1:CuratorFramework連接對象,arg2:節點路徑  lock=new InterProcessMutex(client,path);  //獲取鎖  lock.acquire();  //釋放鎖  lock.release();

 

  • 更多的API請參照官方文檔,不是此篇文章重點。

  • 至此ZK實現分散式鎖就介紹完了,如有想要源碼的朋友,老規矩,關注微信公眾號【碼猿技術專欄】,回復關鍵詞分散式鎖獲取。

一點小福利

  • 對於Zookeeper不太熟悉的朋友,陳某特地花費兩天時間總結了ZK的常用知識點,包括ZK常用shell命令、ZK許可權控制、Curator的基本操作API。目錄如下:

  • 需要上面PDF文件的朋友,老規矩,關注微信公眾號【碼猿技術專欄】回復關鍵詞ZK總結