Springboot基於Redisson實現Redis分散式可重入鎖【案例到源碼分析】
- 2022 年 3 月 2 日
- 筆記
- Redis, Redisson, springboot
一、前言
我們在實現使用Redis實現分散式鎖,最開始一般使用SET resource-name anystring NX EX max-lock-time
進行加鎖,使用Lua腳本保證原子性進行實現釋放鎖。這樣手動實現比較麻煩,對此Redis官網也明確說Java版使用Redisson
來實現。小編也是看了官網慢慢的摸索清楚,特寫此記錄一下。從官網到整合Springboot到源碼解讀,以單節點為例,小編的理解都在注釋里,希望可以幫助到大家!!
二、為什麼使用Redisson
1. 我們打開官網
2. 我們可以看到官方讓我們去使用其他
3. 打開官方推薦
4. 找到文檔
Redisson地址
5. Redisson結構
三、Springboot整合Redisson
1. 導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!--redis分散式鎖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
2. 以官網為例查看如何配置
3. 編寫配置類
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author wangzhenjun
* @date 2022/2/9 9:57
*/
@Configuration
public class MyRedissonConfig {
/**
* 所有對redisson的使用都是通過RedissonClient來操作的
* @return
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson(){
// 1. 創建配置
Config config = new Config();
// 一定要加redis://
config.useSingleServer().setAddress("redis://192.168.17.130:6379");
// 2. 根據config創建出redissonClient實例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
4. 官網測試加鎖例子
5. 根據官網簡單Controller介面編寫
@ResponseBody
@GetMapping("/hello")
public String hello(){
// 1.獲取一把鎖,只要鎖名字一樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
// 2. 加鎖
lock.lock();// 阻塞試等待 默認加的都是30s
// 帶參數情況
// lock.lock(10, TimeUnit.SECONDS);// 10s自動解鎖,自動解鎖時間一定要大於業務的執行時間。
try {
System.out.println("加鎖成功" + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 3. 解鎖
System.out.println("解鎖成功:" + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
6. 測試
四、lock.lock()源碼分析
1. 打開RedissonLock
實現類
2. 找到實現方法
@Override
public void lock() {
try {
// 我們發現不穿過期時間源碼默認過期時間為-1
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
3. 按住Ctrl進去lock方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 獲取執行緒的id,佔有鎖的時候field的值為UUID:執行緒號id
long threadId = Thread.currentThread().getId();
// 嘗試獲得鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired 獲得鎖,返回
if (ttl == null) {
return;
}
// 這裡說明獲取鎖失敗,就通過執行緒id訂閱這個鎖
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
// 這裡進行自旋,不斷嘗試獲取鎖
while (true) {
// 繼續嘗試獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired 獲取成功
if (ttl == null) {
// 直接返回,挑出自旋
break;
}
// waiting for message 繼續等待獲得鎖
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消訂閱
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
4. 進去嘗試獲取鎖方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 直接進入非同步方法
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 這裡進行判斷如果沒有設置參數leaseTime = -1
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 此方法進行獲得鎖,過期時間為看門狗的默認時間
// private long lockWatchdogTimeout = 30 * 1000;看門狗默認過期時間為30s
// 加鎖和過期時間要保證原子性,這個方法後面肯定調用執行了Lua腳本,我們下面在看
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 開啟一個定時任務進行不斷刷新過期時間
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired 獲得鎖
if (ttlRemaining == null) {
// 刷新過期時間方法,我們下一步詳細說一下
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
5. 查看tryLockInnerAsync()
方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先判斷鎖是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 存在則獲取鎖
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 然後設置過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// hexists查看哈希表的指定欄位是否存在,存在鎖並且是當前執行緒持有鎖
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// hincrby自增一
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 鎖的值大於1,說明是可重入鎖,重置過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖已存在,且不是本執行緒,則返回過期時間ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
6. 進入4留下的定時任務scheduleExpirationRenewal()
方法
一步步往下找源碼:scheduleExpirationRenewal —>renewExpiration
根據下面源碼,定時任務刷新時間為:internalLockLeaseTime / 3,是看門狗的1/3,即為10s刷新一次
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
五、lock.lock(10, TimeUnit.SECONDS)源碼分析
1. 打開實現類
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
// 這裡的過期時間為我們輸入的10
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
2. 方法lock()
實現展示,同三.3源碼
3. 直接來到嘗試獲得鎖tryAcquireAsync()
方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 這裡進行判斷如果沒有設置參數leaseTime = -1,此時我們為10
if (leaseTime != -1) {
// 來到此方法
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 此處省略後面內容,前面以詳細說明。。。。
}
4. 打開tryLockInnerAsync()
方法
我們不難發現和沒有傳過期時間的方法一樣,只不過leaseTime的值變了。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先判斷鎖是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 存在則獲取鎖
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 然後設置過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// hexists查看哈希表的指定欄位是否存在,存在鎖並且是當前執行緒持有鎖
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// hincrby自增一
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 鎖的值大於1,說明是可重入鎖,重置過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖已存在,且不是本執行緒,則返回過期時間ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
六、lock.unlock()源碼分析
1. 打開方法實現
@Override
public void unlock() {
try {
// 點擊進入釋放鎖方法
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
2. 打開unlockAsync()
方法
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 解鎖方法,後面展開說
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 完成
future.onComplete((opStatus, e) -> {
if (e != null) {
// 取消到期續訂
cancelExpirationRenewal(threadId);
// 將這個未來標記為失敗並通知所有人
result.tryFailure(e);
return;
}
// 狀態為空,說明解鎖的執行緒和當前鎖不是同一個執行緒
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
3. 打開unlockInnerAsync()
方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判斷釋放鎖的執行緒和已存在鎖的執行緒是不是同一個執行緒,不是返回空
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 釋放鎖後,加鎖次數減一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 判斷剩餘數量是否大於0
"if (counter > 0) then " +
// 大於0 ,則刷新過期時間
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 釋放鎖,刪除key並發布鎖釋放的消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
七、總結
這樣大家就跟著小編走完了一遍底層源碼,是不是感覺自己又行了,哈哈哈。小編走下來一遍覺得收貨還是蠻大的,以前不敢點進去源碼,進去就懵逼了,所以人要大膽的向前邁出第一步。一起加油吧,看到這裡不一鍵三連,有點對不起小編了哦!!
順便推廣一下自己的網站!!!