冷飯新炒:理解Redisson中分散式鎖的實現
前提
在很早很早之前,寫過一篇文章介紹過Redis
中的red lock
的實現,但是在生產環境中,筆者所負責的項目使用的分散式鎖組件一直是Redisson
。Redisson
是具備多種記憶體數據網格特性的基於Java
編寫的Redis
客戶端框架(Redis Java Client with features of In-Memory Data Grid
),基於Redis
的基本數據類型擴展出很多種實現的高級數據結構,具體見其官方的簡介圖:
本文要分析的R(ed)Lock
實現,只是其中一個很小的模組,其他高級特性可以按需選用。下面會從基本原理、源碼分析和基於Jedis
仿實現等內容進行展開。本文分析的Redisson
源碼是2020-01
左右Redisson
項目的main
分支源碼,對應版本是3.14.1
。
基本原理
red lock
的基本原理其實就”光明正大地”展示在Redis
官網的首頁文檔中(具體鏈接是//redis.io/topics/distlock
):
摘錄一下簡介進行翻譯:在許多環境中不同進程必須以互斥方式使用共享資源進行操作時,分散式鎖是一個非常有用的原語。此試圖提供一種更規範的演算法來實現Redis的分散式鎖。我們提出了一種稱為Redlock
的演算法,它實現了DLM
(猜測是Distributed Lock Manager
的縮寫,分散式鎖管理器),我們認為它比普通的單實例方法更安全。
演算法的三個核心特徵(三大最低保證):
Safety property
(安全性):互斥。確保在任何給定時刻下,只有一個客戶端可以持有鎖Liveness property A
(活性A
):無死鎖。即使存在曾經鎖定資源的客戶端崩潰或者出現網路分區異常,確保鎖總是能夠成功獲取Liveness property B
(活性B
):容錯性。只要大多數Redis
節點處於正常運行狀態,客戶端就可以獲取和釋放鎖
文檔中還指出了目前演算法對於故障轉移的實現還存在明顯的競態條件問題(描述的應該是Redis
主從架構下的問題):
- 客戶端
A
獲取Redis
主節點中的鎖(假設鎖定的資源為X
) - 在
Redis
主節點把KEY
同步到Redis
從節點之前,Redis
主節點崩潰 Redis
從節點因為故障晉陞為主節點- 此時,客戶端
B
獲取資源X
的鎖成功,問題是資源X
的鎖在前面已經被客戶端A
獲取過,這樣就出現了並發問題
演算法的實現很簡單,單個Redis
實例下加鎖命令如下:
SET $resource_name $random_value NX PX $ttl
這裡的Nx
和PX
是SET
命令的增強參數,自從Redis
的2.6.12
版本起,SET
命令已經提供了可選的複合操作符:
EX
:設置超時時間,單位是秒PX
:設置超時時間,單位是毫秒NX
:IF NOT EXIST
的縮寫,只有KEY
不存在的前提下才會設置K-V
,設置成功返回1
,否則返回0
XX
:IF EXIST
的縮寫,只有在KEY
存在的前提下才會設置K-V
,設置成功返回1
,否則返回0
單個Redis
實例下解鎖命令如下:
# KEYS[1] = $resource_name
# ARGV[1] = $random_value
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
使用Redisson中的RLock
使用RLock
要先實例化Redisson
,Redisson
已經適配了Redis
的哨兵、集群、普通主從和單機模式,因為筆者本地只安裝了單機Redis
,所以這裡使用單機模式配置進行演示。實例化RedissonClient
:
static RedissonClient REDISSON;
@BeforeClass
public static void beforeClass() throws Exception {
Config config = new Config();
// 單機
config.useSingleServer()
.setTimeout(10000)
.setAddress("redis://127.0.0.1:6379");
REDISSON = Redisson.create(config);
// // 主從
// config.useMasterSlaveServers()
// .setMasterAddress("主節點連接地址")
// .setSlaveAddresses(Sets.newHashSet("從節點連接地址"));
// REDISSON = Redisson.create(config);
// // 哨兵
// config.useSentinelServers()
// .setMasterName("Master名稱")
// .addSentinelAddress(new String[]{"哨兵連接地址"});
// REDISSON = Redisson.create(config);
// // 集群
// config.useClusterServers()
// .addNodeAddress(new String[]{"集群節點連接地址"});
// REDISSON = Redisson.create(config);
}
加鎖和解鎖:
@Test
public void testLockAndUnLock() throws Exception {
String resourceName = "resource:x";
RLock lock = REDISSON.getLock(resourceName);
Thread threadA = new Thread(() -> {
try {
lock.lock();
process(resourceName);
} finally {
lock.unlock();
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
}, "threadA");
Thread threadB = new Thread(() -> {
try {
lock.lock();
process(resourceName);
} finally {
lock.unlock();
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
}, "threadB");
threadA.start();
threadB.start();
Thread.sleep(Long.MAX_VALUE);
}
private void process(String resourceName) {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("執行緒%s獲取到資源%s的鎖", threadName, resourceName));
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
}
// 某次執行的輸出結果
執行緒threadB獲取到資源resource:x的鎖
執行緒threadB釋放資源resource:x的鎖
執行緒threadA獲取到資源resource:x的鎖
執行緒threadA釋放資源resource:x的鎖
更多的時候,我們會選用帶等待時間周期和鎖最大持有時間的API
:
@Test
public void testTryLockAndUnLock() throws Exception {
String resourceName = "resource:x";
int waitTime = 500;
int leaseTime = 1000;
Thread threadA = new Thread(() -> {
process(resourceName, waitTime, leaseTime);
}, "threadA");
Thread threadB = new Thread(() -> {
process(resourceName, waitTime, leaseTime);
}, "threadB");
threadA.start();
threadB.start();
Thread.sleep(Long.MAX_VALUE);
}
private void process(String resourceName, int waitTime, int leaseTime) {
RLock lock = REDISSON.getLock(resourceName);
try {
String threadName = Thread.currentThread().getName();
boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
if (tryLock) {
try {
System.out.println(String.format("執行緒%s獲取到資源%s的鎖", threadName, resourceName));
Thread.sleep(800);
} finally {
lock.unlock();
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
} else {
System.out.println(String.format("執行緒%s獲取資源%s的鎖失敗,等待時間:%d ms", threadName, resourceName, waitTime));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 某次執行的輸出結果
執行緒threadA獲取到資源resource:x的鎖
執行緒threadB獲取資源resource:x的鎖失敗,等待時間:500 ms
執行緒threadA釋放資源resource:x的鎖
為了使用的時候更加簡單,可以參考spring-tx
中的編程式事務那樣進行輕度封裝:
@RequiredArgsConstructor
private static class RedissonLockProvider {
private final RedissonClient redissonClient;
public <T> T executeInLock(String resourceName, LockAction lockAction) {
RLock lock = redissonClient.getLock(resourceName);
try {
lock.lock();
lockAction.onAcquire(resourceName);
return lockAction.doInLock(resourceName);
} finally {
lock.unlock();
lockAction.onExit(resourceName);
}
}
public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
RLock lock = redissonClient.getLock(resourceName);
boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
if (tryLock) {
try {
lockAction.onAcquire(resourceName);
return lockAction.doInLock(resourceName);
} finally {
lock.unlock();
lockAction.onExit(resourceName);
}
}
return null;
}
public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
RLock lock = redissonClient.getLock(resourceName);
boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
if (tryLock) {
try {
lockAction.onAcquire(resourceName);
lockAction.doInLock(resourceName);
} finally {
lock.unlock();
lockAction.onExit(resourceName);
}
}
}
public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
RLock lock = redissonClient.getLock(resourceName);
try {
lock.lock();
lockAction.onAcquire(resourceName);
lockAction.doInLock(resourceName);
} finally {
lock.unlock();
lockAction.onExit(resourceName);
}
}
}
@FunctionalInterface
interface LockAction {
default void onAcquire(String resourceName) {
}
<T> T doInLock(String resourceName);
default void onExit(String resourceName) {
}
}
@FunctionalInterface
interface LockActionWithoutResult {
default void onAcquire(String resourceName) {
}
void doInLock(String resourceName);
default void onExit(String resourceName) {
}
}
使用RedissonLockProvider
(僅供參考):
@Test
public void testRedissonLockProvider() throws Exception {
RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
String resourceName = "resource:x";
Thread threadA = new Thread(() -> {
provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {
@Override
public void onAcquire(String resourceName) {
System.out.println(String.format("執行緒%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
@Override
public void doInLock(String resourceName) {
try {
Thread.sleep(800);
} catch (InterruptedException ignore) {
}
}
@Override
public void onExit(String resourceName) {
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
});
}, "threadA");
Thread threadB = new Thread(() -> {
provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {
@Override
public void onAcquire(String resourceName) {
System.out.println(String.format("執行緒%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
@Override
public void doInLock(String resourceName) {
try {
Thread.sleep(800);
} catch (InterruptedException ignore) {
}
}
@Override
public void onExit(String resourceName) {
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
});
}, "threadB");
threadA.start();
threadB.start();
Thread.sleep(Long.MAX_VALUE);
}
// 某次執行結果
執行緒threadA獲取到資源resource:x的鎖
執行緒threadA釋放資源resource:x的鎖
執行緒threadB獲取到資源resource:x的鎖
執行緒threadB釋放資源resource:x的鎖
Redisson中RLock的實現原理
Redisson
中RLock
的實現是基本參照了Redis
的red lock
演算法進行實現,不過在原始的red lock
演算法下進行了改良,主要包括下面的特性:
- 互斥
- 無死鎖
- 可重入,類似於
ReentrantLock
,同一個執行緒可以重複獲取同一個資源的鎖(一般使用計數器實現),鎖的重入特性一般情況下有利於提高資源的利用率 - 續期,這個是一個比較前衛解決思路,也就是如果一個客戶端對資源
X
永久鎖定,那麼並不是直接對KEY
生存周期設置為-1
,而是通過一個守護執行緒每隔固定周期延長KEY
的過期時間,這樣就能實現在守護執行緒不被殺掉的前提下,避免客戶端崩潰導致鎖無法釋放長期佔用資源的問題 - 鎖狀態變更訂閱,依賴於
org.redisson.pubsub.LockPubSub
,用於訂閱和通知鎖釋放事件 - 不是完全參考
red lock
演算法的實現,數據類型選用了HASH
,配合Lua
腳本完成多個命令的原子性
續期或者說延長KEY
的過期時間在Redisson
使用watch dog
實現,理解為用於續期的守護執行緒,底層依賴於Netty
的時間輪HashedWheelTimer
和任務io.netty.util.Timeout
實現,俗稱看門狗,下面會詳細分析。
先看RLock
的類圖:
這裡有一個疑惑點,RedissonRedLock(RedissonMultiLock的子類)的注釋中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但從直觀上看,RedissonLock才是整個鎖體系的核心,裡面的實現思路也是遵從red lock演算法的。
RedissonLock
就是RLock
的直接實現,也是分散式鎖實現的核心類,從源碼中看到Redisson#getLock()
就是直接實例化RedissonLock
public class Redisson implements RedissonClient {
// ...... 省略其他程式碼
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
// ...... 省略其他程式碼
}
因此只需要圍繞RedissonLock
的源碼進行分析即可。RedissonLock
的類繼承圖如下:
這裡需要有幾點認知:
RedissonLock
實現了java.util.concurrent.locks.Lock
介面中除了newCondition()
方法外的所有方法,也就是可以基本無縫適配Lock
介面,對於習慣Lock
介面的API
的使用者來說是一個福音RedissonLock
基本所有同步API
都依賴於非同步API
的實現,也就是RLock
的實現依賴於RLockAsync
的實現,底層依賴的是Netty
的io.netty.util.concurrent.Promise
,具體見RedissonPromise
,如果用過JUC
中的Future
的開發者應該比較熟悉Future#get()
,這裡的做法類似- 右邊的幾個父類的簡單功能描述如下:
RObjectAsync
:所有Redisson
對象的基礎介面,提供一些記憶體測量、對象拷貝、移動等的非同步方法RObject
:RObjectAsync
的同步版本RExpirableAsync
:提供對象TTL
相關的非同步方法RExpirable
:RExpirableAsync
的同步版本RedissonObject
:直接實現類RObject
介面中的方法RedissonExpirable
:主要是實現了RExpirable
介面中的方法
接著先看RedissonLock
的構造函數和核心屬性:
// 存放entryName -> ExpirationEntry,用於獲取當前entryName的執行緒重入計數器和續期任務
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
// 內部的鎖持有的最大時間,來源於參數Config#lockWatchdogTimeout,用於控制續期的周期
protected long internalLockLeaseTime;
// ID,唯一標識,是一個UUID
final String id;
//
final String entryName;
// 鎖釋放事件訂閱發布相關
protected final LockPubSub pubSub;
// 命令非同步執行器實例
final CommandAsyncExecutor commandExecutor;
/**
* CommandAsyncExecutor是命令的非同步執行器,裡面的方法是相對底層的面向通訊框架的方法,包括非同步寫、非同步讀和同步結果獲取等
* name參數就是getLock()時候傳入的參數,其實就是最終同步到Redis中的KEY
*/
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
// 這裡的ID為外部初始化的UUID實例,調用toString()
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
// 這裡的entryName = uuid值 + : + 外部傳進來的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
this.entryName = id + ":" + name;
// 初始化LockPubSub實例,用於訂閱和發布鎖釋放的事件
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
// RedissonLock內部類ExpirationEntry,存放著執行緒重入的計數器和續期的Timeout任務
public static class ExpirationEntry {
// 執行緒ID -> 執行緒重入的次數
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
// 這個方法主要記錄執行緒重入的計數
public void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
public boolean hasNoThreads() {
return threadIds.isEmpty();
}
public Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
public void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
這裡需要關注一下Config
中的lockWatchdogTimeout
參數:
翻譯一下大意:lockWatchdogTimeout
參數只有在沒有使用leaseTimeout
參數定義的成功獲取到鎖的場景(簡單來說就是不設置時限的加鎖)下生效,如果看門狗在下一個lockWatchdogTimeout
周期內不進行續期,那麼鎖就會過期釋放(從源碼上看,每三分之一lockWatchdogTimeout
就會執行一次續期任務,每次通過pexpire
把KEY
的存活周期延長lockWatchdogTimeout
),lockWatchdogTimeout
的默認值為30000
,也就是30
秒。
這裡先列舉一下RedissonLock
中獲取名稱的方法,以便後面分析這些名稱作為K-V
結構的KEY
時候使用:
id
:由配置實例化時候實例化的UUID
實例生成,從源碼上分析每個連接方式的Redisson
實例有唯一的UUID
,ConnectionManager
初始化的時候會調用UUID id = UUID.randomUUID()
,筆者認為可以理解為Redisson
實例在某個應用程式進程中的唯一標識,畢竟一般情況下,一個應用程式應該只會應用一種Redisson
的連接方式getEntryName()
:返回的是UUID + : + $KEY
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
getName()
:返回的是$KEY
,例如resource:x
getChannelName()
:返回的是redisson_lock__channel:{$KEY}
,例如redisson_lock__channel:{resource:x}
getLockName(long threadId)
:返回的是UUID + : + $threadId
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
接著看加鎖的方法,核心實現主要是:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
:lock
方法體系public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
:tryLock
方法體系
先看只包含鎖最大持有時間的lock()
方法體系:
/**
* 獲取鎖,不指定等待時間,只指定鎖的最大持有時間
* 通過interruptibly參數配置支援中斷
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖,返回的ttl為空代表獲取鎖成功,返回的ttl代表已經存在的KEY的剩餘存活時間
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 訂閱redisson_lock__channel:{$KEY},其實本質的目的是為了客戶端通過Redis的訂閱發布,感知到解鎖的事件
// 這個方法會在LockPubSub中註冊一個entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實例中存放著RPromise<RedissonLockEntry>結果,一個訊號量形式的鎖和訂閱方法重入計數器
// 下面的死循環中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是從這個映射中獲取的
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 同步訂閱執行,獲取註冊訂閱Channel的響應,區分是否支援中斷
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
// 走到下面的for循環說明返回的ttl不為空,也就是Redis已經存在對應的KEY,有其他客戶端已經獲取到鎖,此客戶端執行緒的調用需要阻塞等待獲取鎖
try {
while (true) {
// 死循環中嘗試獲取鎖,這個是後面會分析的方法
ttl = tryAcquire(leaseTime, unit, threadId);
// 返回的ttl為空,說明獲取到鎖,跳出死循環,這個死循環或者拋出中斷異常,或者獲取到鎖成功break跳出,沒有其他方式
if (ttl == null) {
break;
}
// 這個ttl來源於等待存在的鎖的KEY的存活時間,直接使用許可為0的訊號量進行阻塞等待,下面的幾個分支判斷都是大同小異,只是有的支援超時時間,有的支援中斷
// 有的是永久阻塞直到鎖釋放事件訂閱LockPubSub的onMessage()方法回調激活getLatch().release()進行解鎖才會往下走
// 這裡可以學到一個特殊的技巧,Semaphore(0),訊號量的許可設置為0,首個調用acquire()的執行緒會被阻塞,直到其他執行緒調用此訊號量的release()方法才會解除阻塞,類似於一個CountDownLatch(1)的效果
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關註解鎖事件
unsubscribe(future, threadId);
}
}
// 這是一個非同步轉同步的方法,類似於FutureTask#get(),關鍵看調用的tryAcquireAsync()方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
/**
* 通過傳入鎖持有的最大時間和執行緒ID非同步獲取鎖
*/
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 鎖持有最大時間不為-1,也就是明確鎖的持有時間,不是永久持有的場景
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 走到這裡說明是leaseTime == -1,KEY不設置過期時間的分支,需要啟動看門狗機制。嘗試內部非同步獲取鎖,注意這裡的lockWatchdogTimeout是從配置中獲取傳進去,不是內部的internalLockLeaseTime屬性,這裡的默認值還是30000毫秒
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 執行異常場景直接返回
if (e != null) {
return;
}
// 成功獲取到鎖的場景,需要基於執行緒ID啟用看門狗,通過時間輪指定定時任務進行續期
if (ttlRemaining == null) {
// 定時調度進行續期操作
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
/**
* 轉換鎖持有最大時間,通過參數進行加鎖的LUA腳本調用
* getName()就是傳入的KEY,如resource:x getLockName()就是鎖的名稱,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
* internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
*/
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 時間轉換為毫秒,注意一點這裡的internalLockLeaseTime是類內的屬性,被重新賦值了
internalLockLeaseTime = unit.toMillis(leaseTime);
// 底層向Redis服務執行LUA腳本
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
先留意一下屬性internalLockLeaseTime
,它在tryLockInnerAsync()
方法內被重新賦值,在leaseTime == -1L
的前提下,它被賦值為lockWatchdogTimeout
,這個細節很重要,決定了後面續期方法(看門口)的調度頻率。另外,leaseTime != -1L
不會進行續期,也就是不會啟動看門狗機制。
接著需要仔細分析一下tryLockInnerAsync()
中執行的LUA
腳本,筆者把它提取出來通過注釋進行描述:
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一段程式碼是判斷鎖定的資源KEY不存在的時候進行相應值的設置,代表資源沒有被鎖定,首次獲取鎖成功
if (redis.call('exists', KEYS[1]) == 0) then
-- 這裡是設置調用次數,可以理解為延長KEY過期時間的調用次數
redis.call('hset', KEYS[1], ARGV[2], 1);
-- 設置KEY的過期時間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 第二段程式碼是判斷HASH的field是否存在,如果存在說明是同一個執行緒重入的情況,這個時候需要延長KEY的TTL,並且HASH的field對應的value加1,記錄延長ttl的次數
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 這裡是增加調用次數,可以理解為增加延長KEY過期時間的調用次數
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 延長KEY的過期時間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 第三段程式碼是兜底的,走到這裡說明當前執行緒獲取鎖失敗,鎖已經被其他(進程中的)執行緒佔有,返回當前KEY被佔用資源的ttl,用來確定需要休眠的最大時間
return redis.call('pttl', KEYS[1]);
這裡畫一個圖演示一下這個Lua
腳本中三段程式碼出現的邏輯:
剩下一個scheduleExpirationRenewal(threadId)
方法還沒有分析,裡面的邏輯就是看門狗的定期續期邏輯:
// 基於執行緒ID定時調度和續期
private void scheduleExpirationRenewal(long threadId) {
// 如果需要的話新建一個ExpirationEntry記錄執行緒重入計數,同時把續期的任務Timeout對象保存在屬性中
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 當前進行的當前執行緒重入加鎖
oldEntry.addThreadId(threadId);
} else {
// 當前進行的當前執行緒首次加鎖
entry.addThreadId(threadId);
// 首次新建ExpirationEntry需要觸發續期方法,記錄續期的任務句柄
renewExpiration();
}
}
// 處理續期
private void renewExpiration() {
// 根據entryName獲取ExpirationEntry實例,如果為空,說明在cancelExpirationRenewal()方法已經被移除,一般是解鎖的時候觸發
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 新建一個定時任務,這個就是看門狗的實現,io.netty.util.Timeout是Netty結合時間輪使用的定時任務實例
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 這裡是重複外面的那個邏輯,
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 獲取ExpirationEntry中首個執行緒ID,如果為空說明調用過cancelExpirationRenewal()方法清空持有的執行緒重入計數,一般是鎖已經釋放的場景
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 向Redis非同步發送續期的命令
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 拋出異常,續期失敗,只列印日誌和直接終止任務
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// 返回true證明續期成功,則遞歸調用續期方法(重新調度自己),續期失敗說明對應的鎖已經不存在,直接返回,不再遞歸
if (res) {
// reschedule itself
renewExpiration();
}
});
}
},
// 這裡的執行頻率為leaseTime轉換為ms單位下的三分之一,由於leaseTime初始值為-1的情況下才會進入續期邏輯,那麼這裡的執行頻率為lockWatchdogTimeout的三分之一
internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// ExpirationEntry實例持有調度任務實例
ee.setTimeout(task);
}
// 調用Redis,執行Lua腳本,進行非同步續期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
// 這裡根據前面的分析,internalLockLeaseTime在leaseTime的值為-1的前提下,對應值為lockWatchdogTimeout
internalLockLeaseTime, getLockName(threadId));
}
基於源碼推斷出續期的機制由入參leaseTime
決定:
- 當
leaseTime == -1
的前提下(一般是lock()
和lockInterruptibly()
這類方法調用),續期任務的調度周期為lockWatchdogTimeout / 3
,鎖的最大持有時間(KEY
的過期時間)被刷新為lockWatchdogTimeout
- 當
leaseTime != -1
的前提下(一般是lock(long leaseTime, TimeUnit unit)
和lockInterruptibly(long leaseTime, TimeUnit unit)
這類方法調用指定leaseTime
不為-1
),這種情況下會直接設置鎖的過期時間為輸入值轉換為ms
單位的時間量,不會啟動續期機制
提取續期的Lua
腳本如下:
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
到此為止,不帶waitTime
參數的加鎖和續期邏輯基本分析完畢,而帶waitTime
參數的tryLock(long waitTime, long leaseTime, TimeUnit unit)
實現其實和只存在leaseTime
參數的lock(long leaseTime, TimeUnit unit, boolean interruptibly)
實現底層調用的方法是一致的,最大的區別是會在嘗試獲取鎖操作之後基於前後的System.currentTimeMillis()
計算出時間差和waitTime
做對比,決定需要阻塞等待還是直接超時獲取鎖失敗返回,處理阻塞等待的邏輯是客戶端本身的邏輯,這裡就不做詳細展開,因為源碼實現也不是十分優雅(太多long currentTime = System.currentTimeMillis()
的程式碼段了)。接著花點功夫分析一下解鎖的實現,包括一般情況下的解鎖unlock()
和強制解鎖forceUnlockAsync()
:
// 一般情況下的解鎖
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
// IllegalMonitorStateException一般是A執行緒加鎖,B執行緒解鎖,內部判斷執行緒狀態不一致拋出的
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync() {
// 獲取當前調用解鎖操作的執行緒ID
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
// 構建一個結果RedissonPromise
RPromise<Void> result = new RedissonPromise<Void>();
// 返回的RFuture如果持有的結果為true,說明解鎖成功,返回NULL說明執行緒ID異常,加鎖和解鎖的客戶端執行緒不是同一個執行緒
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 這是內部的異常,說明解鎖異常,需要取消看門狗的續期任務
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
// 這種情況說明執行緒ID異常,加鎖和解鎖的客戶端執行緒不是同一個執行緒,拋出IllegalMonitorStateException異常
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
// 走到這裡說明正常解鎖,取消看門狗的續期任務
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
// 真正的內部解鎖的方法,執行解鎖的Lua腳本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
// 取消續期任務
void cancelExpirationRenewal(Long threadId) {
// 這裡說明ExpirationEntry已經被移除,一般是基於同一個執行緒ID多次調用解鎖方法導致的(並發解鎖)
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
// 傳入的執行緒ID不為NULL,從ExpirationEntry中移除執行緒ID,如果持有的執行緒ID對應的執行緒重入計數不為0,會先遞減到0,等於0的前提下才會進行刪除
if (threadId != null) {
task.removeThreadId(threadId);
}
// 這裡threadId == null的情況是為了滿足強制解鎖的場景,強制解鎖需要直接刪除鎖所在的KEY,不需要理會傳入的執行緒ID(傳入的執行緒ID直接為NULL)
// 後者task.hasNoThreads()是為了說明當前的鎖沒有被任何執行緒持有,對於單執行緒也確定在移除執行緒ID之後重入計數器已經為0,從ExpirationEntry中移除,這個時候獲取ExpirationEntry的任務實例進行取消即可
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
// EntryName -> ExpirationEntry映射中移除當前鎖的相關實例ExpirationEntry
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
// 強制解鎖
@Override
public boolean forceUnlock() {
return get(forceUnlockAsync());
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
// 執行緒ID傳入為NULL,取消當前的EntryName對應的續期任務
cancelExpirationRenewal(null);
// 執行Lua腳本強制刪除鎖所在的KEY並且發布解鎖消息
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}
這裡列出一般情況下解鎖和強制解鎖的Lua
腳本,分析如下:
-- unlockInnerAsync方法的lua腳本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0
-- ARGV[2] == internalLockLeaseTime --> 30000或者具體的鎖最大持有時間
-- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一個IF分支判斷如果鎖所在的哈希的field不存在,說明當前執行緒ID未曾獲取過對應的鎖,返回NULL表示解鎖失敗
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 走到這裡通過hincrby進行執行緒重入計數-1,返回計數值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 計數值大於0,說明執行緒重入加鎖,這個時候基於internalLockLeaseTime對鎖所在KEY進行續期
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 計數值小於或等於0,說明可以解鎖,刪除鎖所在的KEY,並且向redisson_lock__channel:{$KEY}發布消息,內容是0(常量數值)
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 最後的return nil;在IDEA中提示是不會到達的語句,估計這裡是開發者筆誤寫上去的,前面的if-else都有返回語句,這裡應該是不可達的
return nil;
-------------------------------------------------- 不怎麼華麗的分割線 -------------------------------------------------
-- forceUnlockAsync方法的lua腳本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0
-- 強制刪除鎖所在的KEY,如果刪除成功向redisson_lock__channel:{$KEY}發布消息,內容是0(常量數值)
if (redis.call('del', KEYS[1]) == 1) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1
else
return 0
end
其他輔助方法都相對簡單,這裡弄個簡單的”流水賬”記錄一番:
isLocked()
:基於getName()
調用Redis
的EXISTS $KEY
命令判斷是否加鎖isHeldByThread(long threadId)
和isHeldByCurrentThread()
:基於getName()
和getLockName(threadId)
調用Redis
的HEXISTS $KEY $LOCK_NAME
命令判斷HASH
中對應的field-value
是否存在,存在則說明鎖被對應執行緒ID
的執行緒持有getHoldCount()
:基於getName()
和getLockName(threadId)
調用Redis
的HGET $KEY $LOCK_NAME
命令,用於獲取執行緒對於某一個鎖的持有量(注釋叫holds
,其實就是同一個執行緒對某一個鎖的KEY
的續期次數)
訂閱和發布部分設計到大量Netty
組件使用相關的源碼,這裡不詳細展開,這部分的邏輯簡單附加到後面這個流程圖中。最後,通過一個比較詳細的圖分析一下Redisson
的加鎖和解鎖流程。
- 不帶
waitTime
參數的加鎖流程:
- 帶有
waitTime
參數的加鎖流程(圖右邊的流程基本不變,主要是左邊的流程每一步都要計算時間間隔):
- 解鎖流程:
假設不同進程的兩個不同的執行緒X
和Y
去競爭資源RESOURCE
的鎖,那麼可能的流程如下:
最後再概括一下Redisson
中實現red lock
演算法使用的HASH
數據類型:
KEY
代表的就是資源或者鎖,創建、存在性判斷,延長生存周期和刪除操作總是針對KEY
進行的FIELD
代表的是鎖名稱lockName()
,但是其實它由Redisson
連接管理器實例的初始化UUID
拼接客戶端執行緒ID
組成,嚴格來說應該是獲取鎖的客戶端執行緒唯一標識VALUE
代表的是客戶端執行緒對於鎖的持有量,從源碼上看應該是KEY
被續期的次數
基於Jedis實現類似Redisson的分散式鎖功能
前面的章節已經比較詳細分析了Redisson
中分散式鎖的實現原理,這裡使用Jedis
和多執行緒技巧做一個類似的實現。為了簡單起見,這裡只實現一個無入參的lock()
方法(類似於Redisson
中leaseTime == -1
的場景)和unlock()
方法。定義介面RedLock
:
public interface RedLock {
void lock(String resource) throws InterruptedException;
void unlock(String resource);
}
為了簡單起見,筆者把所有實現邏輯都寫在實現類RedisRedLock
中:
@RequiredArgsConstructor
public class RedisRedLock implements RedLock {
private final JedisPool jedisPool;
private final String uuid;
private static final String WATCH_DOG_TIMEOUT_STRING = "30000";
private static final long WATCH_DOG_TASK_DURATION = 10000L;
private static final String CHANNEL_PREFIX = "__red__lock:";
private static final String UNLOCK_STATUS_STRING = "0";
private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" +
" redis.call('hset', KEYS[1], ARGV[2], 1);\n" +
" redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
" return nil;\n" +
"end;\n" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" +
" redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" +
" redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
" return nil;\n" +
"end;\n" +
"return redis.call('pttl', KEYS[1]);";
private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" +
" return nil;\n" +
"end;\n" +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" +
"if (counter > 0) then\n" +
" redis.call('pexpire', KEYS[1], ARGV[2]);\n" +
" return 0;\n" +
"else\n" +
" redis.call('del', KEYS[1]);\n" +
" redis.call('publish', KEYS[2], ARGV[1]);\n" +
" return 1;\n" +
"end;";
private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool();
private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2
);
private static class ThreadEntry {
private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap();
private volatile WatchDogTask watchDogTask;
public synchronized void addThreadId(long threadId) {
Integer counter = threadCounter.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadCounter.put(threadId, counter);
}
public synchronized boolean hasNoThreads() {
return threadCounter.isEmpty();
}
public synchronized Long getFirstThreadId() {
if (threadCounter.isEmpty()) {
return null;
}
return threadCounter.keySet().iterator().next();
}
public synchronized void removeThreadId(long threadId) {
Integer counter = threadCounter.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadCounter.remove(threadId);
} else {
threadCounter.put(threadId, counter);
}
}
public void setWatchDogTask(WatchDogTask watchDogTask) {
this.watchDogTask = watchDogTask;
}
public WatchDogTask getWatchDogTask() {
return watchDogTask;
}
}
@Getter
private static class SubPubEntry {
private final String key;
private final Semaphore latch;
private final SubscribeListener subscribeListener;
public SubPubEntry(String key) {
this.key = key;
this.latch = new Semaphore(0);
this.subscribeListener = new SubscribeListener(key, latch);
}
}
private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap();
@Override
public void lock(String resource) throws InterruptedException {
long threadId = Thread.currentThread().getId();
String lockName = uuid + ":" + threadId;
String entryName = uuid + ":" + resource;
// 獲取鎖
Long ttl = acquire(resource, lockName, threadId, entryName);
// 加鎖成功直接返回
if (Objects.isNull(ttl)) {
return;
}
// 訂閱
SubPubEntry subPubEntry = subscribeAsync(resource);
try {
for (; ; ) {
ttl = acquire(resource, lockName, threadId, entryName);
// 加鎖成功直接返回
if (Objects.isNull(ttl)) {
return;
}
if (ttl > 0L) {
subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
}
} finally {
unsubscribeSync(subPubEntry);
}
}
private Long acquire(String key, String lockName, long threadId, String entryName) {
Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key),
Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
if (Objects.nonNull(result)) {
return Long.parseLong(String.valueOf(result));
}
// 啟動看門狗
ThreadEntry entry = new ThreadEntry();
ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key),
Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction));
entry.setWatchDogTask(watchDogTask);
WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS);
}
return null;
}
private SubPubEntry subscribeAsync(String key) {
SubPubEntry subPubEntry = new SubPubEntry(key);
SUB_PUB_POOL.submit(() -> {
SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName()));
return null;
});
return subPubEntry;
}
private void unsubscribeSync(SubPubEntry subPubEntry) {
SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
subscribeListener.unsubscribe(subscribeListener.getChannelName());
}
@Override
public void unlock(String resource) {
long threadId = Thread.currentThread().getId();
String entryName = uuid + ":" + resource;
String lockName = uuid + ":" + threadId;
String channelName = CHANNEL_PREFIX + resource;
Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName),
Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName)));
ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName);
if (Objects.nonNull(threadEntry)) {
threadEntry.removeThreadId(threadId);
if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) {
threadEntry.getWatchDogTask().cancel();
}
}
if (Objects.isNull(result)) {
throw new IllegalMonitorStateException();
}
}
private static class SubscribeListener extends JedisPubSub {
@Getter
private final String key;
@Getter
private final String channelName;
@Getter
private final Semaphore latch;
public SubscribeListener(String key, Semaphore latch) {
this.key = key;
this.channelName = CHANNEL_PREFIX + key;
this.latch = latch;
}
@Override
public void onMessage(String channel, String message) {
if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) {
latch.release();
}
}
}
@RequiredArgsConstructor
private static class WatchDogTask implements Runnable {
private final AtomicBoolean running = new AtomicBoolean(true);
private final AtomicReference<Runnable> actionReference;
@Override
public void run() {
if (running.get() && Objects.nonNull(actionReference.get())) {
actionReference.get().run();
} else {
throw new WatchDogTaskStopException("watch dog cancel");
}
}
public void cancel() {
actionReference.set(null);
running.set(false);
}
}
private <T> T execute0(Function<Jedis, T> function) {
try (Jedis jedis = jedisPool.getResource()) {
return function.apply(jedis);
}
}
interface Action {
void apply(Jedis jedis);
}
private void executeWithoutResult(Action action) {
try (Jedis jedis = jedisPool.getResource()) {
action.apply(jedis);
}
}
private static class WatchDogTaskStopException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
public static void main(String[] args) throws Exception {
String resourceName = "resource:x";
RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString());
Thread threadA = new Thread(() -> {
try {
redLock.lock(resourceName);
process(resourceName);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redLock.unlock(resourceName);
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
}, "threadA");
Thread threadB = new Thread(() -> {
try {
redLock.lock(resourceName);
process(resourceName);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redLock.unlock(resourceName);
System.out.println(String.format("執行緒%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
}
}, "threadB");
threadA.start();
threadB.start();
Thread.sleep(Long.MAX_VALUE);
}
private static void process(String resourceName) {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("執行緒%s獲取到資源%s的鎖", threadName, resourceName));
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
}
}
上面的實現短時間內編寫完,沒有做詳細的DEBUG
,可能會有紕漏。某次執行結果如下:
執行緒threadB獲取到資源resource:x的鎖
執行緒threadB釋放資源resource:x的鎖
執行緒threadA獲取到資源resource:x的鎖
執行緒threadA釋放資源resource:x的鎖
小結
Redisson
中的red lock
實現,應用到下面的核心技術:
- 合理應用
Redis
的基本數據類型HASH
Redis
的訂閱發布Lua
腳本的原子性Netty
中的Promise
實現Netty
中的時間輪HashedWheelTimer
和對應的定時任務(HashedWheel)Timeout
Semaphore
進行帶期限、永久或者可中斷的阻塞以及喚醒,替代CountDownLatch
中的無等待期限阻塞
上面的核心技術相對合理地應用,才能實現一個高效而且容錯能力相對比較高的分散式鎖方案,但是從目前來看,Redisson
仍未解決red lock
演算法中的故障轉移缺陷,筆者認為這個有可能是Redis
實現分散式鎖方案的一個底層缺陷,此方案在Redis
單實例中是相對完善,一旦應用在Redis
集群(普通主從、哨兵或者Cluster
),有幾率會出現前文提到的節點角色切換導致多個不同客戶端獲取到同一個資源對應的鎖的問題。暫時無解。
參考資料:
Redisson
開源版本源碼Redis
官方文檔
畫圖用的是ProcessOn
://www.processon.com/view/link/5ffc540de0b34d2060d2d715
(c-2-w e-a-20210110 2021年的第一篇文章,希望這一年不要這麼鴿,這個系列的下一篇是《冷飯新炒:理解JDK中UUID的底層實現》)