分佈式鎖Redission
- 2021 年 10 月 12 日
- 筆記
- SpringCloud
Redisson 作為分佈式鎖
官方文檔://github.com/redisson/redisson/wiki
-
引入依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.1</version> </dependency>
2.配置redission
@Configuration public class MyRedissonConfig { /** * 所有對 Redisson 的使用都是通過 RedissonClient * * @return * @throws IOException */ @Bean(destroyMethod = "shutdown") public RedissonClient redisson() throws IOException { // 1、創建配置 Config config = new Config(); // Redis url should start with redis:// or rediss:// config.useSingleServer().setAddress("redis://192.168.163.131:6379"); // 2、根據 Config 創建出 RedissonClient 實例 return Redisson.create(config); } }
3.測試
@Autowired RedissonClient redissonClient; @Test public void redission() { System.out.println(redissonClient); }
4.使用
@ResponseBody @GetMapping("/hello") public String hello() { // 1. 獲取一把鎖 RLock lock = redisson.getLock("my-lock"); // 2. 加鎖, 阻塞式等待 lock.lock(); try { System.out.println("加鎖成功,執行業務..."); Thread.sleep(15000); } catch (Exception e) { } finally { // 3. 解鎖 假設解鎖代碼沒有運行,Redisson 會出現死鎖嗎?(不會) System.out.println("釋放鎖"+Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
假設解鎖代碼沒有運行,Redisson 會出現死鎖嗎?
不會
- 鎖的自動續期,如果業務時間很長,運行期間自動給鎖續期 30 s,不用擔心業務時間過長,鎖自動過期被刪掉;
- 加鎖的業務只要運行完成,就不會給當前鎖續期,即使不手動續期,默認也會在 30 s 後解鎖
源碼分析-Redission如何解決死鎖
Ctrl+Alt查看方法實現
這是一個加鎖方法,不傳過期時間
public void lock() {
try {
//這裡過期時間自動賦值成-1
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
然後會調用 this.lock(-1L, (TimeUnit)null, false)方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//得到線程ID
long threadId = Thread.currentThread().getId();
//通過線程ID獲取到鎖
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
//如果沒有獲取到鎖
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
this.getEntry(threadId).getLatch().acquire();
} else {
this.getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
獲取鎖方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}
裏面又調用了tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//如果傳了過期時間
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//沒有傳過期時間
else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
有指定過期時間走tryLockInnerAsync
方法,嘗試用異步加鎖
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//先把時間轉換成internalLockLeaseTime
this.internalLockLeaseTime = unit.toMillis(leaseTime);
//然後執行lua腳本 發給redis執行
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
沒有指定過期時間調用getLockWatchdogTimeout()方法,獲取鎖的默認看門狗時間,30秒
public long getLockWatchdogTimeout() {
return this.lockWatchdogTimeout;
}
this.lockWatchdogTimeout = 30000L;
還是調用tryLockInnerAsync
給redis
發送命令,占鎖成功返回一個以不變異步編排的RFuture<Long>
對象,來進行監聽,裏面有兩個參數ttlRemaining, e
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
裏面有個scheduleExpirationRenewal
方法
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//重新設置過期時間
this.renewExpiration();
}
}
裏面的關鍵方法renewExpiration
執行定時任務,
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//裏面會執行一個定時任務
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
//看門狗時間/3 10秒鐘重試一次
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
主要是來運行renewExpirationAsync
這個方法
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
裏面傳入了一個internalLockLeaseTime
時間參數
又是獲取看門狗時間
總結
-
如果傳了鎖的超時時間,就發送給redis執行腳本,進行占鎖,默認超時就是我們指定的時間
-
如果未指定鎖的超時時間,就是使用lockWatchdogTimeout的默認時間30秒,只要佔鎖成功就會啟動一個定時任務【重新給所設置時間,新的過期時間就是
lockWatchdogTimeout
的默認時間】最佳實踐使用自定義過期時間,省掉了自動續期時間,自動加鎖
讀寫鎖測試
@GetMapping("/write")
@ResponseBody
public String writeValue()
{
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String s="";
RLock rLock=readWriteLock.writeLock();
try{
//加寫鎖
rLock.lock();
s= UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue",s);
}catch (Exception e){
e.printStackTrace();
}
finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue()
{
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String s="";
//加讀鎖
RLock rLock=readWriteLock.readLock();
rLock.lock();
try{
s=redisTemplate.opsForValue().get("writeValue");
}catch (Exception e){
e.printStackTrace();
}
finally {
rLock.unlock();
}
return s;
}
寫鎖沒釋放讀鎖就必須等待,沒有寫鎖讀鎖都可以讀
保證數據的一致性,寫鎖是一個排他鎖、互斥鎖,讀鎖是共享鎖。
讀讀共享、讀寫互斥、寫寫互斥、寫讀互斥,只要有寫的存在都必須等待
信號量測試
像車庫停車,每進來一輛車,車庫減少一個車位,只有當車庫還有車位才可以停車
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
//獲取一個信號 佔一個值
park.acquire();
return "ok";
}
@GetMapping("/go")
@ResponseBody
public String go(){
RSemaphore park = redisson.getSemaphore("park");
//釋放一個車位
park.release();
return "ok";
}
訪問:
信號量可以用作分佈式的限流
閉鎖
只有等待所有活動都完成才發生,例如當所有班級放學走完才關閉學校大門
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await();//等待閉鎖都完成
return "放假啦....";
}
@GetMapping("/gogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();
return id+"班都走了";
}
緩存一致性解決
在我們讀緩存的時候可能會有數據被修改過,為了讓我們能夠讀到最新的數據,有兩種處理方法:
雙寫模式
在把數據寫入數據庫的時候,同時寫入到緩存中
問題:在寫的過程中,可能會在第一個線程緩存還沒寫進,但是第二個查詢到緩存又開始寫數據,讀到的最新數據有延遲,導致產生臟數據
失效模式
在把數據寫入數據更新的時候,把緩存刪除,下次查詢沒有緩存再添加緩存
問題:在線程1更新數據的時候消耗大量時間,還沒刪緩存,線程2進來也沒有緩存,讀取到原來老的數據,然後更新緩存
我們系統的一致性解決方案:
1、緩存的所有數據都有過期時間,數據過期下一次查詢觸發主動更新
2、讀寫數據的時候,加上分佈式的讀寫鎖。
3、遇到實時性、一致性要求高的數據,就應該查數據庫,即使慢點。