【Redis】利用 Redis 實現分佈式鎖

技術背景

首先我們需要先來了解下什麼是分佈式鎖,以及為什麼需要分佈式鎖。

對於這個問題,我們可以簡單將鎖分為兩種——內存級鎖以及分佈式鎖,內存級鎖即我們在 Java 中的 synchronized 關鍵字(或許加上進程級鎖修飾更恰當些),而分佈式鎖則是應用在分佈式系統中的一種鎖機制。分佈式鎖的應用場景舉例以下幾種:

  • 互聯網秒殺

  • 搶優惠卷

  • 接口冪等校驗

我們接下來以一段簡單的秒殺系統中的判斷庫存及減庫存來描述下為什麼需要到分佈式鎖:

public String deductStock() throws InterruptedException {
    
    // 1.從 Redis 中獲取庫存值
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    // 2.判斷庫存
    if (stock > 0) {
        int readStock = stock - 1;
        // 3.從新設置庫存
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
        System.out.println("扣減成功,剩餘庫存:" + readStock + "");
    } else {
        System.out.println("扣減失敗,庫存不足");
    }
    
    return "end";
}

上面這段代碼中,實現了電商系統中的一個簡單小需求,即判斷商品的剩餘數量是否充足,充足則可以成功賣出商品,並將庫存減去 1。我們很容易了解這段代碼的目的。接下來我們就來一步一步地分析這段代碼的缺陷。

基本實現

原子性問題

上面代碼中的注釋1~3部分,並沒有實現原子性的邏輯。所以假設現在如果只剩下一件商品,那麼可能會出現以下情況:

  • 線程 A 運行到代碼2,判斷庫存大於0,進入條件體中將 stock - 1 賦值給 readStock,在執行代碼 3 前停止了下來;
  • 線程 B 同樣運行到代碼2,判斷出庫存大於0(線程A並沒有寫回Redis),之後並沒有停止,而是繼續執行到方法結束;
  • 線程 A 此時恢復執行,執行完代碼 3,將庫存寫回 Redis。

現在我們就發現了問題,明明只有一件商品,卻被兩個線程賣出去了兩次,這就是沒有保證這部分代碼的原子性所帶來的安全問題。

那對於這個問題如何解決呢?

常規的方式自然就是加鎖以保證並發安全。那麼以我們 Java 自帶的鎖去保證並發安全,如下:

public Synchronized String deductStock() throws InterruptedException {    
    // 業務邏輯...
}

我們知道 synchronized 和 Lock 支持 JVM 內同一進程內的線程互斥,所以如果我們的項目是單機部署的話,到這裡也就能保證這段代碼的原子性了。不過以互聯網項目來說,為了避免單點故障以及並發量的問題,一般都是以分佈式的形式部署的,很少會以單機部署,這種情況就會帶來新的問題。

分佈式問題

剛剛我們將到了如果項目分佈式部署的話,那麼就會產生新的並發問題。接下來我們以 Nginx 配置負載均衡為例來演示並發問題,同樣的請求可能會被分發到多台服務器上,那麼我們剛剛所講的 synchronized 或者 Lock 在此時就失效了。同樣的代碼,在 A 服務器上確實可以避免其他線程去競爭資源,但是此時 A 服務器上的那段 synchronized 修飾的方法並不能限制 B 服務器上的程序去訪問那段代碼,所以依舊會產生我們一開始所講到的線程並發問題。

那麼如何解決掉這個問題呢?這個是否就需要 Redis 上場了,Redis 中有一個命令SETNX key valueSETNX 是 「SET if not exists」 (如果不存在,則 SET)的縮寫。那麼這條指令只在 key 不存在的情況下,將鍵 key 的值設置為 value。若鍵 key 已經存在,則 SETNX 命令不做任何動作。

有了上面命令做支撐,同時我們了解到 Redis 是單線程模型(不要去計較它的網絡讀寫和備份狀態下的多線程)。那麼我們就可以這麼實現,當一個服務器成功的向 Redis 中設置了該命令,那麼就認定為該服務器獲得了當前的分佈式鎖,而其他服務器此時就只能一直等待該服務器釋放了鎖為止。我們來看下代碼實現:

// 為了演示方便,這裡簡單定義了一個常量作為商品的id
public static final String PRODUCT_ID = "100001";

public String deductStock() throws InterruptedException {

    // 通過 stringRedisTemplate 來調用 Redis 的 SETNX 命令,key 為商品的id,value的值在這不重要
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(RODUCT_ID, "jojo");
    if (!result) {
        return "error";
    }

    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        int readStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
        System.out.println("扣減成功,剩餘庫存:" + readStock + "");
    } else {
        System.out.println("扣減失敗,庫存不足");
    }

    // 業務執行完成,刪除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    
    return "end";
}

到這裡我們就成功地利用 Redis 實現了一把簡單的分佈式鎖,那麼這樣實現是否就沒有問題了呢?

鎖釋放問題

生產環境比我們想像中要複雜得多,上面代碼並不能正真地運用在我們的生產環境中,我們可以試想一下,如果服務器 A 中的程序成功地給線程加鎖,並且執行完了減庫存的邏輯,但是最終卻沒有安全地運行stringRedisTemplate.delete(PRODUCT_ID)這行代碼,也就是沒有成功釋放鎖,那其他服務器就永遠無法拿到 Redis 中的分佈式鎖了,也就會陷入死鎖的狀態。

解決這個方法,可能許多人都會想到想到——try-finally語句塊,像下面代碼這樣:

public String deductStock() throws InterruptedException {

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(RODUCT_ID, "jojo");
    if (!result) {
        return "error";
    }

    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int readStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
            System.out.println("扣減成功,剩餘庫存:" + readStock + "");
        } else {
            System.out.println("扣減失敗,庫存不足");
        }
    } finally {
        //業務執行完成,刪除PRODUCT_ID key
        stringRedisTemplate.delete(PRODUCT_ID);
    }

    return "end";
}

但是上面代碼是否正真解決問題了呢?單看代碼本身是沒什麼問題的,但是前面提到,生產環境是非常複雜的。我們假設這種情況:當線程在成功加鎖之後,執行業務代碼時,還沒來得及刪除 Redis 中的鎖標誌,此時,這台服務器宕機了,程序並沒有想我們想像中地去執行 finally 塊中的代碼。這種情況也會使得其他服務器或者進程在後續過程中無法去獲取到鎖,從而導致死鎖,最終導致業務崩潰的情況。所以說,對於鎖釋放問題來說,try-finally 語句塊在這裡還不夠,那麼我們就需要新的方法來解決這個問題了。

Redis 超時機制

Redis 中允許我們設置緩存的自動過期時間,我們可以將其引入我們上面的鎖機制中,這樣就算 finally 語句塊中的釋放語句沒有被正確執行,Redis 中的緩存也能在設定時間內自動過期,不會形成死鎖:

// 設置過期時間
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

當然如果只是簡單的在代碼中加入上述語句的話,還是有可能產生死鎖的,因為加鎖以及設置過期時間是分開來執行的,並不能保證原子性。所以為了解決這個問題,Redis 中也提供了將設置值與設置過期時間合一的操作,對於 Java 代碼如下:

// 將設置值與設置過期時間合一
stringRedisTemplate.opsForValue().opsForValue().setIfAbsent(lockKey, "jojo", 10, TimeUnit.SECONDS);

到這一步,我們可以確保 Redis 中我們上的鎖,最終無論如何都能成功地被釋放掉,避免了造成死鎖的情況。但是以當前的代碼實現來看,在一些高並發場景下還是可能產生鎖失效的情況。我們可以試想一下,上面代碼我們設置的過期時間為 10s,那麼如果這個進程在 10s 內並沒有完成這段業務邏輯,會產生什麼樣的情況?不過在此之前我們先將代碼的公共部分抽出作一個組件類,這樣有助於我們關注鎖的邏輯。

代碼集成

公共方法的提取

我們這裡先定義一個 RedisLock 接口,代碼如下所示:

public interface RedisLock {
    /**
     * 嘗試加鎖
     */
    boolean tryLock(String key, long timeout, TimeUnit unit);
    
    /**
     * 解鎖操作
     */
    void releaseLock(String key);
    
}

接下來,我們基於上面已經實現的分佈式鎖的思路,來實現這個接口,代碼如果所示:

public class RedisLockImpl implements RedisLock {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit) {
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "jojo", timeout, unit);
    }
    
    @Override
    public void releaseLock(String key) {
        stringRedisTemplate.delete(key);
    }
    
}

加鎖&解鎖的歸一化

我們先來繼續分析上面代碼。從開發的角度來說,當一個線程從上到下執行一個需要加分佈式鎖的業務時,它首先需要進行加鎖操作,當業務執行完畢後,再進行釋放鎖的操作。也就是先調用 tryLock() 函數再調用 releaseLock() 函數。

但是真正可靠代碼並不依靠人性,其他開發人員有可能在編寫代碼的時候並沒有調用 tryLock() 方法,而是直接調用了 releaseLock() 方法,並且可能在調用 releaseLock() 時傳入的 Key 值與你調用 tryLock() 時傳入的 Key 值是相同的,那麼此時就可能出現問題:另一段代碼在運行時,硬生生將你代碼中加的鎖給釋放掉了,那麼此時的鎖就失效了。所以上述代碼依舊是有不可靠的地方,鎖的可能誤刪操作會使得程序存在很嚴重的問題。

那麼針對這一問題,我們就需要實現加鎖&解鎖的歸一化。

首先我們解釋一下什麼叫做加鎖和解鎖的歸一化,簡單來說,就是一個線程執行了加鎖操作後,後續的解鎖操作只能由該線程來執行,即加鎖操作和解鎖只能由同一線程來進行。

這裡我們使用 ThreadLocal 和 UUID 來實現,代碼如下:

public class RedisLockImpl implements RedisLock {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLock<string> threadLock = new ThreadLock<>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit) {
        String uuid = UUID.randomUUID().toString();
        threadlocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    
    @Override
    public void releaseLock(String key) {
        if (threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            stringRedisTemplate.delete(key);
        }
    }
    
}

可重入發佈式鎖實現

上面的代碼實現,可以保證當一個線程成功在 Redis 中設置了鎖標誌位後,其他線程再設置鎖標誌位時,返回 false。但是在一些場景下我們需要實現線程的重入,即相同的線程能夠多次獲取同一把鎖,不需要等待鎖釋放後再去加鎖。所以我們需要利用一些方式來實現分佈式鎖的可重入型,在 JDK 1.6 之後提供的內存級鎖很多都支持可重入型,比如 synchronized 和 J.U.C 下的 Lock,其本質都是一樣的,比對已經獲得鎖的線程是否與當前線程相同,是則重入,當釋放鎖時則需要根據重入的次數,來判斷此時鎖是否真正釋放掉了。那麼我們就按照這個思路來實現一個可重入的分佈式鎖:

public class RedisLockImpl implements RedisLock {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit) {
        Boolean isLocked = false;
        if (threadLocal.get() == null) {
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        } else {
            isLocked = true;
        }
        
        // 重入次數加1
        if (isLocked) {
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        
        return isLocked;
    }
    
    @Override
    public void releaseLock(String key) {
        // 判斷當前線程所對應的uuid是否與Redis對應的uuid相同,再執行刪除鎖操作
        if (threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            Integer count = threadLocalInteger.get();
            // 計數器減為0時才能釋放鎖
            if (count == null || --count <= 0) {
                stringRedisTemplate.delete(key);
            }
        }
    }
    
}

分佈式自旋鎖實現

上面代碼實現中,加入我們不能一次性獲取到鎖,那麼就會直接返回失敗,這對業務來說是十分不友好的,假設用戶此時下單,剛好有另外一個用戶也在下單,而且獲取到了鎖資源,那麼該用戶嘗試獲取鎖之後失敗,就只能直接返回「下單失敗」的提示信息的。所以我們需要實現以自旋的形式來獲取到鎖,即不停的重試,基於這個想法,實現代碼如下:

public class RedisLockImpl implements RedisLock {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit) {
        Boolean isLocked = false;
        
        if (threadLocal.get() == null) {
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            // 嘗試獲取鎖失敗,則自旋獲取鎖直至成功
            if (!isLocked) {
                for (;;) {
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if (isLocked) {
                        break;
                    }
                }
            }
        } else {
            isLocked = true;
        }
        // 重入次數加1
        if (isLocked) {
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalIntger.get();
            threadLocalInteger.set(count++);
        }
        
        return isLocked;
    }
    
    @Override
    public void releaseLock(String key) {
        // 判斷當前線程所對應的uuid是否與Redis對應的uuid相同,再執行刪除鎖操作
        if (threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            Integer count = threadLocalInteger.get();
            // 計數器減為0時才能釋放鎖
            if (count == null || --count <= 0) {
                stringRedisTemplate.delete(key);
            }
        }
    }
}

基礎優化

超時問題

在高並發場景下,一把鎖可能會被 N 多的進程競爭,獲取鎖後的業務代碼也可能十分複雜,其運行時間可能偶爾會超過我們設置的過期時間,那麼這個時候鎖就會自動釋放,而其他的進程就有可能來爭搶這把鎖,而此時原來獲得鎖的進程也在同時運行,這就有可能導致超賣現象或者其他並發安全問題。

那麼如何解決這個問題呢?思路很簡單,就是每隔一段時間去檢查當前線程是否還在運行,如果還在運行,那麼就繼續更新鎖的佔有時長,而在釋放鎖的時候。具體的實現稍微複雜些,這裡給出簡易的代碼實現:

public class RedisLockImpl implements RedisLock {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit) {
        Boolean isLocked = false;
        if (threadLocal.get() == null) {
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            // 嘗試獲取鎖失敗,則自旋獲取鎖直至成功
            if (!isLocked) {
                for (;;) {
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if (isLocked) {
                        break;
                    }
                }
            }
            // 啟動新的線程來定期檢查當前線程是否執行完成,並更新過期時間
            new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate, key)).start();
        } else {
            isLocked = true;
        }
        // 重入次數加1
        if (isLocked) {
            Integer count = threadLocalInteger.get() == null ? 0 :threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        
        return isLocked;
    }
    
    @Override
    public void releaseLock(String key) {
        // 判斷當前線程所對應的uuid是否與Redis對應的uuid相同,再執行刪除鎖操作
        if (threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            Integer count = threadLocalInteger.get();
            // 計數器減為0時才能釋放鎖
            if (count == null || --count <= 0) {
                stringRedisTemplate.delete(key);
                // 獲取更新鎖超時時間的線程並中斷
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if (updateLockTimeoutThread != null) {
                    // 中斷更新鎖超時時間的線程
                    updateLockTimeoutThread.interrupt();
                    stringRedisTemplate.delete(uuid);
                }
            }
        }
    }
    
}

接下來我們就創建 UpdateLockTimeoutTask 類來執行更新鎖超時的時間。

public class UpdateLockTimeoutTask implements Runnable {
    
    private long uuid;
    private String key;
    private StringRedisTemplate stringRedisTemplate;
    
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate, String key) {
        this.uuid = uuid;
        this.key = key;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    @Override
    public void run() {
        // 將以uuid為Key,當前線程Id為Value的鍵值對保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
        // 定期更新鎖的過期時間
        while (true) {
            stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);
            try{
                // 每隔3秒執行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

最後,我們定義一個 ThreadUtils 工具類,這個工具類中我們定義一個根據線程 id 獲取線程的方法 getThreadByThreadId(long threadId),代碼如下所示:

public class ThreadUtils {
    
    // 根據線程 id 獲取線程句柄
    public static Thread getThreadByThreadId(long threadId) {
    	ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for (int i = 0; i < count; i++){
                if (threadId == threads[i].getId()) {
                    return threads[i];
                }
            }
        }
    }
    
}

上述解決分佈式鎖失效的方案在分佈式鎖領域有一個專業的術語叫做 「異步續命」 。需要注意的是:當業務代碼執行完畢後,我們需要停止更新鎖超時時間的線程。所以,這裡,我對程序的改動是比較大的,首先,將更新鎖超時的時間任務重新定義為一個 UpdateLockTimeoutTask 類,並將 uuidStringRedisTemplate 注入到任務類中,在執行定時更新鎖超時時間時,首先將當前線程保存到Redis中,其中Key為傳遞進來的 uuid

高並發

以下部分引用自://www.cnblogs.com/binghe001/p/12778853.html

如果我們系統中利用 Redis 來實現分佈式鎖,而 Redis 的讀寫並發量約合 5 萬左右。假設現在一個秒殺業務需要支持的並發量超過百萬級別,那麼如果這 100萬的並發全部打入 Redis 中去請求鎖資源,Redis 將會直接掛掉。所以我們現在應該來考慮如何解決這個問題,即如何在高並發的環境下保證 Redis 實現的分佈式鎖的可用性,接下來我們就來考慮一下這個問題。

在高並發的商城系統中,如果採用 Redis 緩存數據,則 Redis 緩存的並發能力是關鍵,因為很多的前綴操作都需要訪問 Redis。而異步削峰只是基本操作,關鍵還是要保證 Redis 的並發處理能力。

解決這個問題的關鍵思想就是:分而治之,將商品庫存分開放。

我們在 Redis 中存儲商品的庫存數量時,可以將商品的庫存進行「分割」存儲來提升 Redis 的讀寫並發量。

例如,原來的商品的 id 為 10001,庫存為1000件,在Redis中的存儲為(10001, 1000),我們將原有的庫存分割為5份,則每份的庫存為200件,此時,我們在Redis 中存儲的信息為(10001_0, 200),(10001_1, 200),(10001_2, 200),(10001_3, 200),(10001_4, 200)。

此時,我們將庫存進行分割後,每個分割的庫存使用商品 id 加上一個數字標識來存儲,這樣,在對存儲商品庫存的每個 key 進行 Hash 運算時,得出的 Hash 結果是不同的,這就說明,存儲商品庫存的 Key 有很大概率不在 Redis 的同一個槽位中,這就能夠提升 Redis 處理請求的性能和並發量。

分割庫存後,我們還需要在 Redis 中存儲一份商品 ID 和 分割庫存後的 Key 的映射關係,此時映射關係的 Key 為商品的 ID,也就是 10001,Value 為分割庫存後存儲庫信息的 Key,也就是 10001_0,10001_1,10001_2,10001_3,10001_4。在 Redis 中我們可以使用 List 來存儲這些值。

在真正處理庫存信息時,我們可以先從 Redis 中查詢出商品對應的分割庫存後的所有 Key,同時使用 AtomicLong 來記錄當前的請求數量,使用請求數量對從Redis 中查詢出的商品對應的分割庫存後的所有Key的長度進行求模運算,得出的結果為0,1,2,3,4。再在前面拼接上商品id就可以得出真正的庫存緩存的Key。此時,就可以根據這個Key直接到Redis中獲取相應的庫存信息。

同時,我們可以將分隔的不同的庫存數據分別存儲到不同的 Redis 服務器中,進一步提升 Redis 的並發量。

基礎升級

移花接木

在高並發業務場景中,我們可以直接使用 Lua 腳本庫(OpenResty)從負載均衡層直接訪問緩存。

這裡,我們思考一個場景:如果在高並發業務場景中,商品被瞬間搶購一空。此時,用戶再發起請求時,如果系統由負載均衡層請求應用層的各個服務,再由應用層的各個服務訪問緩存和數據庫,其實,本質上已經沒有任何意義了,因為商品已經賣完了,再通過系統的應用層進行層層校驗已經沒有太多意義了!而應用層的並發訪問量是以百為單位的,這又在一定程度上會降低系統的並發度。

為了解決這個問題,此時,我們可以在系統的負載均衡層取出用戶發送請求時攜帶的用戶Id,商品id和活動Id等信息,直接通過 Lua 腳本等技術來訪問緩存中的庫存信息。如果商品的庫存小於或者等於 0,則直接返回商品已售完的提示信息,而不用再經過應用層的層層校驗了。

數據同步

假設我們使用 Redis 來實現分佈式鎖,我們知道 Redis 是基於 CAP 中 AP 來實現的,那麼就可能存在數據未同步的問題。具體的場景就是,我在 Redis 的 Master 上設置了鎖標誌,然而在 Redis 的主從節點上還未完全同步之時,Redis 主節點宕機了,那麼此時從節點上就沒有鎖標誌,從而導致並發安全問題。對於這個問題,常見的解法有兩種,基於 Zookeeper 來實現分佈式鎖(廢話),而另外一種就是 RedLock 了。

Redlock 同很多的分佈式算法一樣,也使用「大多數機制」。加鎖時,它會向過半節點發送 set(key, value, nx=True, ex=xxx) 指令,只要過半節點 set 成功,就認為加鎖成功。釋放鎖時,需要向所有節點發送 del 指令。不過 Redlock 算法還需要考慮出錯重試、時鐘漂移等很多細節,同時因為 RedLock 需要向多個節點進行讀寫,意味着其相比單實例 Redis 的性能會下降一些。

如果你很在乎高可用性,希望即使掛了一台 Redis 也完全不受影響,就應該考慮 Redlock。不過代價也是有的,需要更多的 Redis 實例,性能也下降了,代碼上還需要引入額外的 library,運維上也需要特殊對待,這些都是需要考慮的成本。

參考資料

  • 《Redis深度歷險-核心原理與應用實踐》- 錢文品