Redisson分散式鎖學習總結:可重入鎖 RedissonLock#lock 獲取鎖源碼分析
原文:Redisson分散式鎖學習總結:可重入鎖 RedissonLock#lock 獲取鎖源碼分析
一、RedissonLock#lock 源碼分析
1、根據鎖key計算出 slot,一個slot對應的是redis集群的一個節點
redisson 支援分散式鎖的功能,基本都是基於 lua 腳本來完成的,因為分散式鎖肯定是具有比較複雜的判斷邏輯,而lua腳本可以保證複雜判斷和複雜操作的原子性。
redisson 的 RedissonLock 執行lua腳本,需要先找到當前鎖key需要存放到哪個slot,即在集群中哪個節點進行操作,後續不同客戶端或不同執行緒再使用這個鎖key進行上鎖,也需要到對應的節點的slot中進行加鎖操作。
執行lua腳本的源碼:
org.redisson.command.CommandAsyncService#evalWriteAsync(java.lang.String, org.redisson.client.codec.Codec, org.redisson.client.protocol.RedisCommand<T>, java.lang.String, java.util.List<java.lang.Object>, java.lang.Object...)
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
// 根據鎖key找到對應的redis節點
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
private NodeSource getNodeSource(String key) {
// 計算鎖key對應的slot
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
計算 slot 分主從模式和集群模式,我們一般生產環境都是使用集群模式。
public static final int MAX_SLOT = 16384;
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
// 使用 CRC16 演算法來計算 slot,其中 MAX_SLOT 就是 16384,redis集群規定最多有 16384 個slot。
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
2、RedissonLock 之 lua 腳本加鎖
RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
2.1、KEYS
Collections.singletonList(getName())
KEYS:[“myLock”]
2.2、ARGVS
internalLockLeaseTime,getLockName(threadId)
internalLockLeaseTime:其實就是 watchdog 的超時時間,默認是30000毫秒 Config#lockWatchdogTimeout。
private long lockWatchdogTimeout = 30 * 1000;
getLockName(threadId):客戶端ID(UUID):執行緒ID(threadId)
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
ARGVS:[30000,”UUID:threadId”]
2.3、lua 腳本分析
1、分支一:不存在加鎖記錄,獲取鎖成功
lua腳本:
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
分析:
-
利用 exists 命令判斷 myLock 這個 key 是否存在
exists myLock
-
如果不存在,則執行下面兩個操作
-
執行一個map的操作,給指定key的值增加1
hincrby myLock UUID:threadId
執行後多了一個map數據結構:
myLock:{ "UUID:threadId":1 }
-
給 myLock 設置過期時間為30000毫秒
expire myLock 30000
-
-
最後返回nil,即null
2、分支二:鎖記錄已存在,重複加鎖
lua腳本:
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
分析:
-
判斷之前加鎖的是否為當前客戶端當前執行緒
hexists myLock UUID:threadId
-
如果存在,則將加鎖次數增加1
hincrby myLock UUID:threadId 1
增加1後,map集合內容為:
myLock:{ "UUID:threadId":2 }
利用map這個數據結構,存放加鎖的客戶端執行緒資訊,從而支援可重入鎖。
-
重新刷新 myLock 的過期時間為30000毫秒
expire myLock 30000
3、分支三:獲取鎖失敗,直接返回鎖剩餘過期時間
lua腳本:
"return redis.call('pttl', KEYS[1]);"
分析:
- 利用 pttl 命令獲取鎖剩餘毫秒數
pttl myLock
- 返回步驟1獲取的毫秒數
3、watchdog 不斷為鎖續命
因為我們是利用 lock() 方法獲取鎖的,沒有指定多久後釋放,但是 redisson 不可能真的不設置鎖key的過期時間。
因為要考慮到一個場景:一個客戶端成功獲取鎖,但是沒有設置多久釋放,如果redisson 在redis實例中設置鎖的時候也沒有設置過期時間,如果這個時候客戶端所在的伺服器掛掉了,那麼他就不會執行到unlock() 方法去釋放鎖了,那麼這個時候就會導致死鎖,其他任何的客戶端都獲取不到鎖。
所以 redisson 會有一個 watchdog 的角色,每隔10_000毫秒就會為鎖續命,詳細可看看下面截圖:
再看看定時任務詳細的設計:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
// 一開始就是null,直接放入 EXPIRATION_RENEWAL_MAP 中
entry.addThreadId(threadId);
// 調用定時任務
renewExpiration();
}
}
private void renewExpiration() {
// 上面已經傳入,不為空
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 開啟定時任務,時間是 internalLockLeaseTime / 3 毫秒後執行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 判斷是否存在 ExpirationEntry,只要加鎖了,肯定存在
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
// 循環調用
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判斷 myLock map 中是否存在當前客戶端當前執行緒
myLock:{
"UUID:threadId":1
}
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 存在,刷新過期時間,30_000毫秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
4、死循環獲取鎖
關於死循環獲取鎖,這裡是抓大放小,沒有深入研究裡面比較細的點,只有自己大概的猜測。
程式碼看下圖:
如果獲取鎖失敗,在進入死循環前,會訂閱指定渠道:redisson_lock__channel:{myLock}
,然後進入死循環。
在死循環裡面,首先會先嘗試再獲取一遍鎖,因為可能之前獲取鎖的客戶端剛好釋放鎖了。如果獲取失敗,那麼就進入等待狀態,等待時間是獲取鎖失敗時返回的鎖key的ttl。
訂閱指定channel猜測:因為在客戶端釋放鎖的時候,會往這個channel發送消息;因此可以利用此消息來提前讓等待的執行緒被喚醒去嘗試獲取鎖,因為此時鎖已經被釋放了。
5、其他的加鎖方式
如果我們需要指定獲取鎖成功後持有鎖的時長,可以執行下面方法,指定 leaseTime
lock.lock(10, TimeUnit.SECONDS);
如果指定了 leaseTime,watchdog就不會再啟用了。
如果不但需要指定持有鎖的時長,還想避免鎖獲取失敗時的死循環,可以同時指定 leaseTime 和 waitTime
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了 waitTime,只會在 waitTime 時間內循環嘗試獲取鎖,超過 waitTime 如果還是獲取失敗,直接返回false。