Redis分散式鎖,看完不懂你打我

簡易的redis分散式鎖

加鎖:

set key my_random_value NX PX 30000

這個命令比setnx好,因為可以同時設置過期時間。不設置過期時間,應用掛了,解不了鎖,就一直鎖住了。

解鎖:

if redis.call("get",KEYS[1])==ARGV[1] then
   return redis.call("del",KEYS[1])
else
  return 0
end

先比較一下值,相等才刪除。防止其他執行緒把鎖給解了。

以上方案在一般的場景就夠用了,但還存在一些小問題:

  1. 如果設置過期時間3秒,但是業務執行需要4秒怎麼辦?

解決方案:參照redisson的看門狗,可以後台起一個執行緒去看看業務執行緒執行完了沒有,如果沒有就延長過期時間。

  1. redis是單點的,如果宕機了,那麼整個系統就會崩潰。如果是主從結構,那麼master宕機了,存儲的key還沒同步到slave,此時slave升級為新的master,客戶端2從新的master上就能拿到同一個資源的鎖。這樣客戶端1和客戶端2都拿到鎖,就不安全了。

解決方案:RedLock演算法。簡單說就是N個(通常是5)獨立的redis節點同時執行SETNX,如果大多數成功了,就拿到了鎖。這樣就允許少數節點不可用。

那我們看看工業級別是怎麼實現redis分散式鎖的呢?

Redission實現的redis分散式鎖

加鎖流程:

解鎖流程:

Redission加鎖使用的是redis的hash結構。

  • key :要鎖的資源名稱
  • filed :uuid+”:”+執行緒id
  • value : 數值型,可以實現可重入鎖

源碼裡面用到了netty裡面Promise的一些api,我列出來幫助理解:

    // 非同步操作完成且正常終止
    boolean isSuccess();
    // 非同步操作是否可以取消
    boolean isCancellable();
    // 非同步操作失敗的原因
    Throwable cause();
    // 添加一個監聽者,非同步操作完成時回調,類比javascript的回調函數
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到非同步操作完成
    Future<V> await() throws InterruptedException;
    // 同上,但非同步操作失敗時拋出異常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回非同步結果,如果尚未完成返回null
    V getNow();

源碼分析:

加鎖:

public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
    
     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //命令執行器
        this.commandExecutor = commandExecutor;
       //uuid
        this.id = commandExecutor.getConnectionManager().getId();
        //超時時間,默認30s
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        //獲取執行緒id
        long threadId = Thread.currentThread().getId();
        //嘗試獲取鎖
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        //ttl為空則代表加鎖成功
        if (ttl == null) {
            return;
        }

  //如果獲取鎖失敗,則訂閱到對應這個鎖的channel,等其他執行緒釋放鎖時,通知執行緒去獲取鎖
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
            //再次嘗試獲取鎖
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                 //ttl大於0,則等待ttl時間後繼續嘗試獲取鎖
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
        //取消對channel的訂閱
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

再來看看裡面的嘗試獲取鎖的程式碼:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
        //如果帶有過期時間,則按照普通方式獲取鎖
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
         //先按照30秒的過期時間來執行獲取鎖的方法
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //非同步執行回調監聽
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
             //如果還持有這個鎖,則開啟定時任務不斷刷新該鎖的過期時間
            public void operationComplete(Future<Long> future) throws Exception {
            //沒有成功執行完成
                if (!future.isSuccess()) {
                    return;
                }
       //非阻塞地返回非同步結果,如果尚未完成返回null
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

看門狗邏輯:

使用的是Netty的Timeout延遲任務做的。

  • 比如鎖過期 30 秒, 每過 1/3 時間也就是 10 秒會檢查鎖是否存在, 存在則更新鎖的超時時間

加鎖腳本

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
        //如果鎖不存在,則通過hset設置它的值,並設置過期時間
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                   //如果鎖已存在,並且鎖的是當前執行緒,則通過hincrby給數值遞增1,並重新設置過期時間
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                     //如果鎖已存在,但並非本執行緒,則返回過期時間
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

解鎖:

public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        //底層解鎖方法
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                //如果返回空,則證明解鎖的執行緒和當前鎖不是同一個執行緒,拋出異常
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

解鎖腳本:

 protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                 //如果釋放鎖的執行緒和已存在鎖的執行緒不是同一個執行緒,返回null
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                 //通過hincrby遞減1的方式,釋放一次鎖
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                 //若剩餘次數大於0 ,則刷新過期時間
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                  //否則證明鎖已經釋放,刪除key並發布鎖釋放的消息
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }
Tags: