Redis高並發分散式鎖詳解

為什麼需要分散式鎖

  1.為了解決Java共享記憶體模型帶來的執行緒安全問題,我們可以通過加鎖來保證資源訪問的單一,如JVM內置鎖synchronized,類級別的鎖ReentrantLock。

  2.但是隨著業務的發展,單機服務畢竟存在著限制,故會往多台組合形成集群架構,面對集群架構,我們同樣存在則資源共享問題,而每台伺服器有著自己的JVM,這時候我們對於鎖的實現不得不考慮分散式的實現。

 

分散式鎖應該具備哪些條件

  1.在分散式系統環境下,一個方法在同一時間只能被一個機器的一個執行緒執行

  2.高可用的獲取鎖與釋放鎖

  3.高性能的獲取鎖與釋放鎖

  4.具備可重入特性(可理解為重新進入,由多於一個任務並發使用,而不必擔心數據錯誤)

  5.具備鎖失效機制,即自動解鎖,防止死鎖

  6.具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗

 

秒殺搶購場景模擬(模擬並發問題:其實就是指每一步如果存在間隔時間,那麼當某一執行緒間隔時間拉長,會對其餘執行緒造成什麼影響

  0.如果要在本機測試的話

    1)配置Nginx實現負載均衡

http {
    upstream testfuzai {
        server 127.0.0.1:8080 weight=1;
        server 127.0.0.1:8090 weight=1;
    }
  
    server {
        listen 80;
        server_name localhost;
  
        location / {
            //proxy_pass:設置後端代理伺服器的地址。這個地址(address)可以是一個域名或ip地址和埠,或者一個 unix-domain socket路徑。
            proxy_pass http://testfuzai;
            proxy_set_header Host $proxy_host;
        }
    }
}

    2)啟動redis設置好參數與數量

    3)啟動項目並分別配置不同埠(要與Nginx裡面的一致)

    4)進行壓測,通過jmeter的Thread Group裡面編輯好HTTP Request,設置參數 執行緒數 Number of Threads 【設置為200】 ,請求的重複次數 Loop count 【設置為5】 ,Ramp-up period(seconds)執行緒啟動開始運行的時間間隔(單位是秒)【設置為1】。則,一秒內會有1000個請求打過去。

 

  1.不加鎖進行庫存扣減的情況:

    程式碼示例

@RequestMapping("/deduct_stock")
public String deductStock() {
    //從redis取出庫存
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
    if (stock > 0) {
        int realStock = stock - 1;
        //往redis寫入庫存
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
        System.out.println("扣減成功,剩餘庫存:" + realStock);
    } else {
        System.out.println("扣減失敗,庫存不足");
    }
    return "end";
}

 

    發現說明

      1)通過列印輸出,我們會發現兩台機器上會出現重複的值(即出現了超賣現象)。甚至會出現另一台伺服器的數據覆蓋本伺服器的數據。

      2)原因在於讀取數據和寫入數據存在時間差,如兩個伺服器Q1和Q1,Q1有請求,獲取庫存【假設300】,在庫存判斷大小之後進行扣減庫存如果慢了【假設需要3秒】,那麼Q2有5次請求,獲取到庫存,扣減完後設置,依次5次,則庫存為【295】。但是此時Q1完成自身請求又會把庫存設置為【299】。故不合理。所以應該改為使用stringRedisTemplate.boundValueOps(“stock”).increment(-1); 改為採用redis內部扣除,減少了超賣的個數。但是就算改了也只是避免了覆蓋問題,仍然沒有解決超賣問題。如果有6台伺服器,庫存剩下1個的時候六個請求同時進入到扣減庫存這一步,那麼就會出現超賣5個的現象(這也是超賣個數最多的現象)。

 

  2.採用SETNX的方式加分散式鎖的情況:

    程式碼示例

public String deductStock() {
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
if (!result) { return "error_code"; } try {
     int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock"); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }

    發現說明

      1)這種方式明顯保證了在分散式情況下只有一個執行緒能夠執行業務程式碼。但是我們不可能對於用戶買商品的時候返回錯誤提示,如果不斷自旋的話又容易讓CPU飆升。肯定要考慮休眠與喚醒,但可以在上層方法裡面處理。

      2)同時很明顯示記憶體在個問題,如果我在扣減庫存時候伺服器宕機了,庫存扣減還沒設置【且沒執行finally程式碼,那麼我這個商品的鎖就不會被釋放,除非手動清除】。

那麼肯定需要設置超時時間。如

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

      會發現補一個超時時間的話依舊無法避免之前的問題,故加鎖和設置超時時間需要保持原子性

      3)採用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基於設置了超時時間,那麼我們如何考量超時時間呢,業務執行多久我們根本不可得知。故容易出現時間到期了,業務還沒執行完。這就容易出現A持有鎖執行任務,還沒完成就超時了,B持有鎖執行任務,A執行完,釋放鎖【此時會釋放B的鎖】的情況。所以釋放鎖必須要持有鎖本人才能執行。

if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
    stringRedisTemplate.delete(lockKey);
}

      所以clientId需要是分散式ID,然後釋放鎖改為判斷clientId符合才能去釋放。

 

  3.改進之後的情況:

    程式碼示例

public String deductStock() {
    String lockKey = "lock:product_101";
    String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
    if (!result) {
        return "error_code";
    }

    try {
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
 if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");  System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }

 

    發現說明

      1)即時加了判斷,我們會發現依舊會存在問題【因為判斷與釋放鎖操作不是原子性的】,如果在判斷裡面加上休眠進行試驗

if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
    Thread.sleep(20000);
    stringRedisTemplate.delete(lockKey);
}

      我們會發現根本問題依舊沒有解決,只是減少了發生的情況。究其原因,本質上還是鎖超時導致的。解決這個問題就要引入一個完美的解決方案叫做鎖續命

      2)鎖續命(watchDog):假設主執行緒搶到鎖開始執行業務邏輯,開啟一個分執行緒,在分執行緒裡邊做一個定時任務,比如說設置的鎖超時時間是30s,那麼我們的定時任務時間就設置為10s,定時任務設置的時間一定要比鎖超時時間小,每10s定時任務先去判斷主執行緒有沒有結束,沒有結束的話說明主執行緒就還在,還在進行業務邏輯操作,這個時候我們執行一條expire命令,將主執行緒鎖的超時時間重新設置為30s,這樣的話只要主執行緒還沒結束,主執行緒就會被分執行緒定時任務去做續命邏輯,維持在30s,判斷主執行緒結束,就不再執行續命邏輯。

 

Redisson分散式鎖框架剖析

  1.引入依賴

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

  2.進行配置

@Bean
public Redisson redisson() {
    // 此為單機模式
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
    return (Redisson) Redisson.create(config);
}

  3.業務程式碼展示

public String deductStock2() {
    String lockKey = "lock:product_101";
    //獲取鎖對象
    RLock redissonLock = redisson.getLock(lockKey);
    //加分散式鎖
    redissonLock.lock();
    try {
        Long stock = (Long) stringRedisTemplate.opsForValue().decrement("stock");
        if (stock > 0) {
            Long realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + "");
            System.out.println("扣減成功,剩餘庫存:" + realStock);
        } else {
            System.out.println("扣減失敗,庫存不足");
        }
    } finally {
        //解鎖
        redissonLock.unlock();
    }
    return "end";
}

     發現說明

      1.如果在集群架構下面,分散式鎖如果在Master節點上寫成功了就會返回給客戶端,但是此時還需要同步給從節點。

      2.如果在此時間內Master節點結點宕機,那麼數據將會消失,而從節點上沒有鎖的資訊(變為Master節點)。【主從架構鎖失效問題

 

  4.為解決主從架構鎖失效問題引入的RedLock(不建議用,因為本質上還是沒有解決主從架構鎖失效問題)

    0.原理展示

           

 

    1.redssion 集群配置(在resource下創建 redssion.yml文件)

clusterServersConfig:
  # 連接空閑超時,單位:毫秒 默認10000
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  # 同任何節點建立連接時的等待超時。時間單位是毫秒 默認10000
  connectTimeout: 10000
  # 等待節點回復命令的時間。該時間從命令發送成功時開始計時。默認3000
  timeout: 3000
  # 命令失敗重試次數
  retryAttempts: 3
  # 命令重試發送時間間隔,單位:毫秒
  retryInterval: 1500
  # 重新連接時間間隔,單位:毫秒
  reconnectionTimeout: 3000
  # 執行失敗最大次數
  failedAttempts: 3
  # 密碼
  password: test1234
  # 單個連接最大訂閱數量
  subscriptionsPerConnection: 5
  clientName: null
  # loadBalancer 負載均衡演算法類的選擇
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  #從節點發布和訂閱連接的最小空閑連接數
  slaveSubscriptionConnectionMinimumIdleSize: 1
  #從節點發布和訂閱連接池大小 默認值50
  slaveSubscriptionConnectionPoolSize: 50
  # 從節點最小空閑連接數 默認值32
  slaveConnectionMinimumIdleSize: 32
  # 從節點連接池大小 默認64
  slaveConnectionPoolSize: 64
  # 主節點最小空閑連接數 默認32
  masterConnectionMinimumIdleSize: 32
  # 主節點連接池大小 默認64
  masterConnectionPoolSize: 64
  # 訂閱操作的負載均衡模式
  subscriptionMode: SLAVE
  # 只在從伺服器讀取
  readMode: SLAVE
  # 集群地址
  nodeAddresses:
    - "redis://IP地址:30001"  //
    - "redis://IP地址:30002"
    - "redis://IP地址:30003"
    - "redis://IP地址:30004"
    - "redis://IP地址:30005"
    - "redis://IP地址:30006"
  # 對Redis集群節點狀態掃描的時間間隔。單位是毫秒。默認1000
  scanInterval: 1000
  #這個執行緒池數量被所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務共同共享。默認2
threads: 0
#這個執行緒池數量是在一個Redisson實例內,被其創建的所有分散式數據類型和服務,以及底層客戶端所一同共享的執行緒池裡保存的執行緒數量。默認2
nettyThreads: 0
# 編碼方式 默認org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#傳輸模式
transportMode: NIO
# 分散式鎖自動過期時間,防止死鎖,默認30000
lockWatchdogTimeout: 30000
# 通過該參數來修改是否按訂閱發布消息的接收順序出來消息,如果選否將對消息實行並行處理,該參數只適用於訂閱發布消息的情況, 默認true
keepPubSubOrder: true
# 用來指定高性能引擎的行為。由於該變數值的選用與使用場景息息相關(NORMAL除外)我們建議對每個參數值都進行嘗試。

 

    2.程式碼配置

@Bean
public RedissonClient redisson() throws IOException {
    Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream());
    RedissonClient redisson = Redisson.create(config);
    return redisson;
} 
//或者
@Bean
public Redisson redisson() {
    // 此為集群模式
    Config config = new Config();
    config.useClusterServers()
            .addNodeAddress("redis://127.0.0.1:6379")
            .addNodeAddress("redis://127.0.0.1:6389")
            .addNodeAddress("redis://127.0.0.1:6399")
            .addNodeAddress("redis://127.0.0.1:6369");
    return (Redisson) Redisson.create(config);
}

 

    3.業務程式碼示例

@RequestMapping("/redlock")
public String redlock() {
    RLock lock1 = redisson.getLock("Key1_product_001");
    RLock lock2 = redisson.getLock("Key2_product_001");
    RLock lock3 = redisson.getLock("Key3_product_001");

    /**
     * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這裡)
     */
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    try {
        /**
         * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
         * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大於業務處理的時間,確保在鎖有效期內業務能處理完)
         */
        boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
        if (res) {
            //成功獲得鎖,在這裡處理業務
        }
    } catch (Exception e) {
        throw new RuntimeException("lock fail");
    } finally {
        //無論如何, 最後都要解鎖
        redLock.unlock();
    }
    return "end";
}

 

    4.分析說明(為什麼不推薦用

      1)如果不是集群,為保證高可用,要對三個節點都添加了從節點(因為如果沒有從節點,線上只要有兩個服務宕機了,那麼這個分散式鎖將不再可用)

      2)針對三主三從的情況,A執行緒對redis_1_主 和 redis_2_主 加鎖成功,對 redis_3_主 加鎖失敗,則可以獲得分散式鎖,執行任務。但是還沒同步情況下,redis_1_主宕機,redis_1_從 晉陞成功數據丟失,此時B執行緒來加鎖,redis_1_從加鎖成功和 redis_3_主 加鎖成功,對 redis_2_主 加鎖失敗,也能獲得分散式鎖。【概率不大但還是會存在問題】

      3)針對集群如果不搞主從【一旦出現宕機,數據量大,且訪問高的話,這裡面就存在著快取雪崩的危機】,此外如果集群半數節點宕機,集群會被迫停了,此外如果加鎖節點越多,加鎖效率越低下。

      4)既然原理與zookeeper的差不多而且也損失了高性能的特性,那其實還不如使用zookeeper分散式鎖。 

 

  5.原理分析 

          

 

  6.源碼剖析

    1)Redisson類#getLock方法

public RLock getLock(String name) {
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}

 

    2)Redisson類#lock方法

public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

//RedissonLock類#lockInterruptibly方法
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

//RedissonLock類#lockInterruptibly方法
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //先在redis中發布訂閱消息,等待用完鎖的執行緒通知
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
       //再次嘗試獲取鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                break;
            }

            if (ttl >= 0) {
         //利用 Semaphore 訊號量的方式獲得許可,但是這種休眠是定時的
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

//RedissonLock類#tryAcquire方法
//利用future的方式阻塞式等待返回結果
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

//RedissonObject類#get方法
protected <V> V get(RFuture<V> future) {
    return commandExecutor.get(future);
}

//RedissonLock類#subscribe方法
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

//PublishSubscribe類#subscribe方法
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            // 1:判斷RedisLockEntry 是否存在
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            // 2:創建RedisLockEntry
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            // 3:創建一個監聽器,別的執行緒進行redis-pub命令之後進行調用
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            // 4:底層交給netty調用redis-sub命令
            subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

 

    3)RedissonLock類#tryAcquireAsync方法(核心點主體)

//RedissonLock類#tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //嘗試加鎖邏輯
 RFuture<Long> ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    //添加監聽器
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        //Future任務執行完會回調該方法
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // 加鎖成功
            if (ttlRemaining == null) {
                //看門狗續命
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

 

    4)RedissonLock類#tryLockInnerAsync方法(核心點,加鎖邏輯

//RedissonLock類#tryLockInnerAsync方法
//利用redis的單執行緒執行任務,redis會將整個腳本作為一個整體執行,且中間不會被其他命令插入
//採用的是hash的類型來存儲鎖,為了實現重入鎖的概念
//Redis pttl命令以毫秒為單位返回 key 的剩餘過期時間
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                //對應為KEYS[1](對應傳入的鎖的命名),ARGV[1](設置的超時時間,默認30s) ,ARGV[2] -》(uuid + ":" + threadId)
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

 

    5)RedissonLock類#scheduleExpirationRenewal方法(核心點,看門狗的邏輯【續命】

//RedissonLock類#scheduleExpirationRenewal方法
//採用Future+事件監聽的方式,方法嵌套調用來實現定時任務
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            //再次添加監聽器,重複檢查
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself  //遞歸調用
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    //如果該任務已經存在一個了,就把新建的任務關閉,Map中的key為(uuid + ":" + threadId)
    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

 

    6)Redisson類#unlock方法

//RedissonLock類#unlock方法
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException(...);
    }
    if (opStatus) {
        //移除看門狗的定時任務
        cancelExpirationRenewal();
    }

}

//RedissonLock類#unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //如果不存在鎖
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //當前執行緒並沒有持有鎖,則返回nil
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //前執行緒持有鎖,則對value-1,拿到-1之後的vlaue
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            //value>0,以毫秒為單位返回剩下的過期時間。(保證可重入)
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //value<=0,則對key進行刪除操作,return 1 (方法返回 true)。然後進行redis-pub指令,用於喚醒其他正在休眠的執行緒。
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            //參數順序KEYS[1](鎖的名稱),KEYS[2](發布訂閱的Channel名:redisson_lock__channel+鎖名),ARGV[1](發布的消息),ARGV[2](鎖超時時間),ARGV[3](uuid + ":" + threadId)
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

 

    7)Redisson類#tryLock方法

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            remainTime -= (System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}

 

 

Redis與Zookeeper分散式鎖的區別

  1.從單機角度上來說,兩者差別不大,都是項目引入的外部組件,redis相對於zookeeper來說,項目中使用的更多,常用性角度redis更加。

  2.但是一般我們都會做集群(容錯率更高):

    【1】從分散式的CAP角度分析:

        redis滿足AP,在Master節點上寫成功了會優先返回給客戶端,之後在同步給從節點

        zookeeper滿足CP,在Master節點上寫成功了會優先同步給從節點【ZAB協議(半數以上寫成功)】,之後在返回給客戶端

    【2】主從架構鎖失效問題:

        redis會出現,因為從節點變成主節點時,會出現丟失數據的問題。

        zookeeper不會出現,因為從節點變成主節點時,不會會出現丟失數據的問題。

    【3】集群下性能角度:

        redis性能會高於zookeeper,同步是個耗時的操作(而且這個過程中還是相當於阻塞執行緒),並發越高的情況,我們想要的是耗時越少的越好。

   3.選redis還是zk實現分散式鎖

    首先zk的性能肯定不如redis,但是從分散式鎖的角度語義上來說,zk可能更適合一些,所以如果對性能要求比較高的話就選redis,對數據的強一致性有特別嚴格要求的話就選zk,現在的主流的分散式鎖方案還是redis,也有一些辦法去減少redis主從架構鎖失效問題。

 

如何提升分散式鎖性能

  問題分析

  1.分散式鎖為我們解決了並發問題,但是其底層思路是將並行執行的請求給串列化了,因為redis是單執行緒執行任務的,肯定就不會有並發問題了。

  2.但是這種設計本身是與我們高並發的需求是衝突的。但是某些場景下我們又不得不用,所以我們應該基於場景做一些優化。

  3.正如阿里巴巴Java開發手冊裡面寫到:

6. 【強制】高並發時,同步調用應該去考量鎖的性能損耗。能用無鎖數據結構,就不要用鎖;能鎖區塊,就不要鎖整個方法體;能用對象鎖,就不要用類鎖。

說明:儘可能使加鎖的程式碼塊工作量儘可能的小,避免在鎖程式碼塊中調用 RPC 方法。

7. 【強制】對多個資源、資料庫表、對象同時加鎖時,需要保持一致的加鎖順序,否則可能會造成死鎖。

說明:執行緒一需要對錶 A、B、C 依次全部加鎖後才可以進行更新操作,那麼執行緒二的加鎖順序也必須是 A、B、C,否則可能出現死鎖。


8. 【強制】並發修改同一記錄時,避免更新丟失,需要加鎖。要麼在應用層加鎖,要麼在快取加鎖,要麼在資料庫層使用樂觀鎖,使用 version 作為更新依據。

說明:如果每次訪問衝突概率小於 20%,推薦使用樂觀鎖,否則使用悲觀鎖。樂觀鎖的重試次數不得小於 3 次。

 

  4.所以我們優先從鎖的粒度開始,鎖是否合適,加鎖的範圍是否夠小。鎖的粒度範圍越小越好,加鎖的程式碼越少性能就越高,因為加鎖的程式碼會串列執行,沒有必要加鎖的程式碼肯定是讓他們並行執行這樣效率更高。

  

  案例演示

    場景說明:

      在秒殺搶購的情況下,大量的秒殺商品其實都是走同一邏輯的,如果使用公用的鎖必然是不合適的,這會大大阻塞住整個系統,而且不同商品之前根本不存在競爭關係,故一般我們會採用類似 redis_promotion_product_stock_$productId :1000 這種設置庫存值 。那麼對於每個商品既然擁有了自己的庫存那麼對於對應庫存加鎖就能縮小了鎖的顆粒度。

      但是這種真的就可行了嘛?對A商品的下單,都必對”redis_promotion_product_lock_A”這個鎖key來加鎖。這樣會導致對同一個商品的下單請求,就必須串列化,一個接一個的處理。假設加鎖之後,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,這個過程性能很高吧,算他全過程20毫秒,這應該不錯了。那麼1秒是1000毫秒,只能容納50個對這個商品的請求依次串列完成處理。這種性能遠遠不能滿足我們想要的。而且對於變數進行原子操作這種:

13. 【參考】volatile 解決多執行緒記憶體不可見問題。對於一寫多讀,是可以解決變數同步問題,但是如果多寫,同樣無法解決執行緒安全問題。
如果是 count++操作,使用如下類實現:AtomicInteger count = new AtomicInteger(); count.addAndGet(1); 
如果是 JDK8,推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數)。

 

      了解過源碼的都應該知道 AtomicLong和ConcurrentHashMap 都是優化過類似操作的。那麼為何不參考呢?【分段加鎖思想

      AtomicLong將變數base結合一些數組變數,共同維持總數。面對高並發下,是針對單個數組節點進行加鎖,修改節點內數據,而總量依舊是他們加起來,而且數組的最大容量與核心數有關。是不是豁然開朗?這與我們的場景是不是很像。多台伺服器對應多核心。假設有4台伺服器,我們是不是可以將變數 redis_promotion_product_stock_$productId :1000 拆解為 redis_promotion_product_stock_$productId_1 :250,..,redis_promotion_product_stock_$productId_4 :250,

這樣的4份(性能好的伺服器,可以適當偏多)。那麼伺服器的CPU是不是就充分利用了,而且他們之前的並發問題是不是變小了。

      又或者分成10份,每個伺服器持有一份,用完再去獲取新的份額(這種需要額外添加列表維護,但是並發衝突再次下降)。

      一旦對某個數據做了分段處理之後,就會存在一個問題:假設伺服器A的份額消耗完了,但是其餘伺服器還存於份額:

        解決方案(處理庫存不足的方案是必須要做的):

          1.發現這個分段庫存里的庫存不足了,釋放鎖,然後立馬換下一個分段庫存,再次嘗試加鎖後嘗試處理(核心邏輯)。

             2.依託於負載均衡,先判斷總庫存是否還是有的,有的負載到其他伺服器,要設置好重試次數。(或者不重試,返回友好提示,讓客戶自己去重試,畢竟秒殺搶購這東西)

      

  總結

    1.分散式鎖並發優化,是一個十分複雜的過程,需要考慮數據的拆分,如何選擇拆分的數據,如何校驗,如何切換等等。這些都是需要我們考量和積累經驗的。

 

Tags: