ZooKeeper 分佈式鎖 Curator 源碼 04:分佈式信號量和互斥鎖

前言

分佈式信號量,之前在 Redisson 中也介紹過,Redisson 的信號量是將計數維護在 Redis 中的,那現在來看一下 Curator 是如何基於 ZooKeeper 實現信號量的。

使用 Demo

public class CuratorDemo {

    public static void main(String[] args) throws Exception {

        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .retryPolicy(retryPolicy)
                .build();
        client.start();

        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphores/semaphore_01", 3);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread() + " 線程 start - " + LocalTime.now());
                    Lease lease = semaphore.acquire();
                    System.out.println(Thread.currentThread() + " 線程 execute - " + LocalTime.now());
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread() + " 線程 over -" + LocalTime.now());
                    semaphore.returnLease(lease);
                } catch (Exception e) {

                }

            }).start();
        }

        Thread.sleep(1000000);

    }
}

控制台輸出數據如下:

源碼

獲取憑證

核心源碼:InterProcessSemaphoreV2#internalAcquire1Lease

這裡僅介紹大概邏輯,有興趣的小夥伴可以自行閱讀源碼。

lock 是 InterProcessMutexInterProcessSemaphoreV2 信號量,也是藉助於最基礎的加鎖。

通過圖也可以看出,使用 InterProcessSemaphoreV2 時,會先創建 /semaphores/semaphore_01 路徑,並在路徑下創建 locks 節點。也就是 /semaphores/semaphore_01/locks 路徑下,有 10 個臨時順序節點。

緊接着會在 /semaphores/semaphore_01 路徑下創建 leases 節點,所以創建鎖的臨時順序節點之後,會緊接着在 /semaphores/semaphore_01/leases 下創建臨時順序節點。

/semaphores/semaphore_01/leases 節點進行監聽,同時獲取 /semaphores/semaphore_01/leases 下面的子節點數量。

  1. 如果子節點數量小於等於信號量計數,則直接結束循環;
  2. 如果大於,則會進入 wait 等待喚醒。

釋放憑證

釋放憑證就是調用 Lease 的 close 方法,刪除節點,這樣 /semaphores/semaphore_01/leases 上的監聽器就會觸發,然後其他線程獲取憑證。

互斥鎖

互斥鎖 InterProcessSemaphoreMutex,不支持重入,其他的和可重入鎖並沒有什麼區別。就是基於 InterProcessSemaphoreV2 實現的。

就是把計數的值 maxLeases 設置為了 1。

總結

信號量 InterProcessSemaphoreV2 其實是通過判斷節點下的子節點數量來實現控制信號量,同時內部加鎖是基於可重入鎖 InterProcessMutex 實現的。

互斥鎖 InterProcessSemaphoreMutex 則是將信號量的技術設置為 1 來實現互斥功能。

相關推薦