Redisson分散式鎖源碼

  • 2019 年 12 月 6 日
  • 筆記

最近碰到的一個問題,Java程式碼中寫了一個定時器,分散式部署的時候,多台同時執行的話就會出現重複的數據,為了避免這種情況,之前是通過在配置文件里寫上可以執行這段程式碼的IP,程式碼中判斷如果跟這個IP相等,則執行,否則不執行,想想也是一種比較簡單的方式吧,但是感覺很low很low,所以改用分散式鎖。 目前分散式鎖常用的三種方式:1.資料庫的鎖;2.基於Redis的分散式鎖;3.基於ZooKeeper的分散式鎖。其中資料庫中的鎖有共享鎖和排他鎖,這兩種都無法直接解決資料庫的單點和可重入的問題,所以,本章還是來講講基於Redis的分散式鎖,也可以用其他快取(Memcache、Tair等)來實現。

一、實現分散式鎖的要求

  1. 互斥性。在任何時候,當且僅有一個客戶端能夠持有鎖。
  2. 不能有死鎖。持有鎖的客戶端崩潰後,後續客戶端能夠加鎖。
  3. 容錯性。大部分Redis或者ZooKeeper節點能夠正常運行。
  4. 加鎖解鎖相同。加鎖的客戶端和解鎖的客戶端必須為同一個客戶端,不能讓其他的解鎖了。

二、Redis實現分散式鎖的常用命令

1.SETNX key val

當且僅當key不存在時,set一個key為val的字元串,返回1;若key存在,則什麼都不做,返回0。

2.expire key timeout

為key設置一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖。

3.delete key

刪除key,此處用來解鎖使用。

4.HEXISTS key field

當key 中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。

5.HINCRBY key field increment

將存儲在 key 中的哈希(Hash)對象中的指定欄位 field 的值加上增量 increment。如果鍵 key 不存在,一個保存了哈希對象的新建將被創建。如果欄位 field 不存在,在進行當前操作前,其將被創建,且對應的值被置為 0。返回值是增量之後的值。

三、常見寫法

由上面三個命令,我們可以很快的寫一個分散式鎖出來:

java if (conn.setnx("lock","1").equals(1L)) {   //do something   return true;   }  return false;  

但是這樣也會存在問題,如果獲取該鎖的客戶端掛掉了怎麼辦?一般而言,我們可以通過設置expire的過期時間來防止客戶端掛掉所帶來的影響,可以降低應用掛掉所帶來的影響,不過當時間失效的時候,要保證其他客戶端只有一台能夠獲取。

四、Redisson

Redisson在基於NIO的Netty框架上,充分的利用了Redis鍵值資料庫提供的一系列優勢,在Java實用工具包中常用介面的基礎上,為使用者提供了一系列具有分散式特性的常用工具類。使得原本作為協調單機多執行緒並發程式的工具包獲得了協調分散式多機多執行緒並發系統的能力,大大降低了設計和研發大規模分散式系統的難度。同時結合各富特色的分散式服務,更進一步簡化了分散式環境中程式相互之間的協作。——摘自百度百科

4.1 測試例子 先在pom引入Redssion <dependency>

 <groupId>org.redisson</groupId>   <artifactId>redisson</artifactId>   <version>3.6.1</version>  </dependency>

起100個執行緒,同時對count進行操作,每次操作減1,加鎖的時候能夠保持順序輸出,不加的話為隨機。 public class RedissonTest implements Runnable {

 private static RedissonClient redisson;   private static int count = 10000;   private static void init() {   Config config = new Config();          config.useSingleServer()   .setAddress("redis://119.23.46.71:6340")   .setPassword("root")   .setDatabase(10);          redisson = Redisson.create(config);   }   @Override   public void run() {   RLock lock = redisson.getLock("anyLock");          lock.lock();          count--;   System.out.println(count);          lock.unlock();   }   public static void main(String[] args) {          init();   for (int i = 0; i < 100; i++) {   new Thread(new RedissonTest()).start();   }   }

輸出結果(部分結果): ...

9930  9929  9928  9927  9926  9925  9924  9923  9922  9921  ...

去掉lock.lock()和lock.unlock()之後(部分結果):

...  9930  9931  9933  9935  9938  9937  9940  9941  9942  9944  9947  9946  9914  ... 

五、RedissonLock源碼分析

最新版的Redisson要求redis能夠支援eval的命令,否則無法實現,即Redis要求2.6版本以上。在lua腳本中可以調用大部分的Redis命令,使用腳本的好處如下:

(1)減少網路開銷:在Redis操作需求需要向Redis發送5次請求,而使用腳本功能完成同樣的操作只需要發送一個請求即可,減少了網路往返時延。

(2)原子操作:Redis會將整個腳本作為一個整體執行,中間不會被其他命令插入。換句話說在編寫腳本的過程中無需擔心會出現競態條件,也就無需使用事務。事務可以完成的所有功能都可以用腳本來實現。

(3)復用**:客戶端發送的腳本會永久存儲在Redis中,這就意味著其他客戶端(可以是其他語言開發的項目)可以復用這一腳本而不需要使用程式碼完成同樣的邏輯。 ###

5.1 使用到的全局變數

全局變數:

expirationRenewalMap:存儲entryName和其過期時間,底層用的netty的PlatformDependent.newConcurrentHashMap()

internalLockLeaseTime:鎖默認釋放的時間:30*1000,即30秒

id:UUID,用作客戶端的唯一標識

PUBSUB:訂閱者模式,當釋放鎖的時候,其他客戶端能夠知道鎖已經被釋放的消息,並讓隊列中的第一個消費者獲取鎖。使用PUB/SUB消息機制的優點:減少申請鎖時的等待時間、安全、 鎖帶有超時時間、鎖的標識唯一,防止死鎖 鎖設計為可重入,避免死鎖。

commandExecutor:命令執行器,非同步執行器

5.2 加鎖

以lock.lock()為例,調用lock之後,底層使用的是lockInterruptibly,之後調用lockInterruptibly(-1,null

(1)我們來看一下lockInterruptibly的源碼,如果別的客戶端沒有加鎖,則當前客戶端進行加鎖並且訂閱,其他客戶端嘗試加鎖,並且獲取ttl,然後等待已經加了鎖的客戶端解鎖。

 //leaseTime默認為-1  public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {   long threadId = Thread.currentThread().getId();//獲取當前執行緒ID   Long ttl = tryAcquire(leaseTime, unit, threadId);//嘗試加鎖   // 如果為空,當前執行緒獲取鎖成功,否則已經被其他客戶端加鎖   if (ttl == null) {   return;   }   //等待釋放,並訂閱鎖   RFuture<RedissonLockEntry> future = subscribe(threadId);      commandExecutor.syncSubscription(future);   try {   while (true) {   // 重新嘗試獲取鎖              ttl = tryAcquire(leaseTime, unit, threadId);   // 成功獲取鎖   if (ttl == null) {   break;   }   // 等待鎖釋放   if (ttl >= 0) {                  getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);   } else {                  getEntry(threadId).getLatch().acquire();   }   }   } finally {   // 取消訂閱          unsubscribe(future, threadId);   }  }

(2)下面是tryAcquire的實現,調用的是tryAcquireAsync

  private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {          return get(tryAcquireAsync(leaseTime, unit, threadId));      }

(3)下面是tryAcquireAsync的實現,非同步嘗試進行加鎖,嘗試加鎖的時候leaseTime為-1。通常如果客戶端沒有加鎖成功,則會進行阻塞,leaseTime為鎖釋放的時間。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {      if (leaseTime != -1) {   //在lock.lock()的時候,已經聲明了leaseTime為-1,嘗試加鎖          return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);      }      RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);      //監聽事件,訂閱消息      ttlRemainingFuture.addListener(new FutureListener<Long>() {          @Override          public void operationComplete(Future<Long> future) throws Exception {              if (!future.isSuccess()) {                  return;              }              Long ttlRemaining = future.getNow();              // lock acquired              if (ttlRemaining == null) {                  //獲取新的超時時間                  scheduleExpirationRenewal(threadId);              }          }      });      return ttlRemainingFuture;  //返回ttl時間  }

(4)下面是tryLockInnerAsyncy非同步加鎖,使用lua能夠保證操作是原子性的

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {      internalLockLeaseTime = unit.toMillis(leaseTime);      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,                "if (redis.call(''exists'', KEYS[1]) == 0) then " +                    "redis.call(''hset'', KEYS[1], ARGV[2], 1); " +                    "redis.call(''pexpire'', KEYS[1], ARGV[1]); " +                    "return nil; " +                "end; " +                "if (redis.call(''hexists'', KEYS[1], ARGV[2]) == 1) then " +                    "redis.call(''hincrby'', KEYS[1], ARGV[2], 1); " +                    "redis.call(''pexpire'', KEYS[1], ARGV[1]); " +                    "return nil; " +                "end; " +                "return redis.call(''pttl'', KEYS[1]);",                  Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));  }

參數 KEYS[1](getName()) :需要加鎖的key,這裡需要是字元串類型。 ARGV[1](internalLockLeaseTime) :鎖的超時時間,防止死鎖 ARGV[2](getLockName(threadId)) :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + 「:」 + threadId lua腳本解釋

--檢查key是否被佔用了,如果沒有則設置超時時間和唯一標識,初始化value=1  if (redis.call(''exists'', KEYS[1]) == 0) then    redis.call(''hset'', KEYS[1], ARGV[2], 1);    redis.call(''pexpire'', KEYS[1], ARGV[1]);    return nil;  end;  --如果鎖重入,需要判斷鎖的key field 都一致情況下 value 加一  if (redis.call(''hexists'', KEYS[1], ARGV[2]) == 1) then    redis.call(''hincrby'', KEYS[1], ARGV[2], 1);    --鎖重入重新設置超時時間    redis.call(''pexpire'', KEYS[1], ARGV[1]);    return nil;  end;  --返回剩餘的過期時間  return redis.call(''pttl'', KEYS[1]);

(5)流程圖

5.3 解鎖

解鎖的程式碼很簡單,大意是將該節點刪除,並發布消息。 (1)unlock源碼

  public void unlock() {          Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));          if (opStatus == null) {              throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "                      + id + " thread-id: " + Thread.currentThread().getId());          }          if (opStatus) {              cancelExpirationRenewal();          }

(2)非同步解鎖,並返回是否成功

otected RFuture<Boolean> unlockInnerAsync(long threadId) {      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,              "if (redis.call(''exists'', KEYS[1]) == 0) then " +                  "redis.call(''publish'', KEYS[2], ARGV[1]); " +                  "return 1; " +              "end;" +              "if (redis.call(''hexists'', KEYS[1], ARGV[3]) == 0) then " +                  "return nil;" +              "end; " +              "local counter = redis.call(''hincrby'', KEYS[1], ARGV[3], -1); " +              "if (counter > 0) then " +                  "redis.call(''pexpire'', KEYS[1], ARGV[2]); " +                  "return 0; " +              "else " +                  "redis.call(''del'', KEYS[1]); " +                  "redis.call(''publish'', KEYS[2], ARGV[1]); " +                  "return 1; "+              "end; " +              "return nil;",              Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));      }

輸入的參數有: 參數: KEYS[1](getName()):需要加鎖的key,這裡需要是字元串類型。 KEYS[2](getChannelName()):redis消息的ChannelName,一個分散式鎖對應唯一的一個 channelName:「redisson_lockchannel{」 + getName() + 「}」 ARGV[1](LockPubSub.unlockMessage):redis消息體,這裡只需要一個位元組的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端執行緒申請鎖。 ARGV[2](internalLockLeaseTime):鎖的超時時間,防止死鎖 ARGV[3](getLockName(threadId)) :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + 「:」 + threadId

此處lua腳本的作用:

--如果keys[1]不存在,則發布消息,說明已經被解鎖了  if (redis.call(''exists'', KEYS[1]) == 0) then      redis.call(''publish'', KEYS[2], ARGV[1]);      return 1;  end;  --key和field不匹配,說明當前客戶端執行緒沒有持有鎖,不能主動解鎖。  if (redis.call(''hexists'', KEYS[1], ARGV[3]) == 0) then      return nil;  end;  --將value減1,這裡主要用在重入鎖  local counter = redis.call(''hincrby'', KEYS[1], ARGV[3], -1);  if (counter > 0) then      redis.call(''pexpire'', KEYS[1], ARGV[2]);      return 0;  else  --刪除key並消息      redis.call(''del'', KEYS[1]);      redis.call(''publish'', KEYS[2], ARGV[1]);      return 1;  end;  return nil;  

(3)刪除過期資訊

void cancelExpirationRenewal() {      Timeout task = expirationRenewalMap.remove(getEntryName());      if (task != null) {          task.cancel();      }  }

總結

Redis2.6版本之後引入了eval,能夠支援lua腳本,更好的保證了redis的原子性,而且redisson採用了大量非同步的寫法來避免性能所帶來的影響。本文只是講解了下redisson的重入鎖,其還有公平鎖、聯鎖、紅鎖、讀寫鎖等,有興趣的可以看下。感覺這篇文章寫得也不是很好,畢竟netty還沒開始學,有些api也不太清楚,希望各位大佬能夠建議建議~~

參考: 1.redisson 2.Redis分散式鎖的正確實現方式 3.分散式鎖的多種實現方式 4.用Redis構建分散式鎖 5.基於Redis的分散式鎖實現 6.基於Redis實現分散式鎖,Redisson使用及源碼分析