快進來!花幾分鐘看一下 ReentrantReadWriteLock 的原理!

前言

在看完 ReentrantLock 之後,在高並發場景下 ReentrantLock 已經足夠使用,但是因為 ReentrantLock 是獨佔鎖,同時只有一個執行緒可以獲取該鎖,而很多應用場景都是讀多寫少,這時候使用 ReentrantLock 就不太合適了。讀多寫少的場景該如何使用?在 JUC 包下同樣提供了讀寫鎖 ReentrantReadWriteLock 來應對讀多寫少的場景。

公眾號:『 劉志航 』,記錄工作學習中的技術、開發及源碼筆記;時不時分享一些生活中的見聞感悟。歡迎大佬來指導!

介紹

支援類似 ReentrantLock 語義的 ReadWriteLock 的實現。

具有以下屬性:

  • 獲取順序

此類不會將讀取優先或寫入優先強加給鎖訪問的排序。但是,它確實支援可選的公平 策略。

支援公平模式非公平模式,默認為非公平模式

  • 重入

允許 reader 和 writer 按照 ReentrantLock 的樣式重新獲取讀鎖或寫鎖。在寫執行緒釋放持有的所有寫鎖後,reader 才允許重入使用它們。此外,writer 可以獲取讀鎖,但反過來則不成立。

  • 鎖降級

重入還允許從寫鎖降級為讀鎖,通過先獲取寫鎖,然後獲取讀鎖,最後釋放寫鎖的方式降級。但是,從讀鎖升級到寫鎖是不可能的

  • 鎖獲取的中斷

讀鎖和寫鎖都支援鎖獲取期間的中斷。

  • Condition 支援

寫鎖提供了一個 Condition 實現,對於寫鎖來說,該實現的方式與 ReentrantLock.newCondition() 提供的 Condition 實現對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用於寫鎖。讀鎖不支援 Condition

  • 監測

此類支援一些確定是保持鎖還是爭用鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。

鎖最多支援 65535 個遞歸寫鎖和 65535 個讀鎖

以上為 Java Api 官方文檔[1] 的解釋,總結一下內容如下:

  1. 支援非公平和公平模式,默認為非公平模式。
  2. 支援重入,讀鎖可以重入獲取讀鎖,寫鎖可以重入獲取寫鎖,寫鎖可以獲取讀鎖,讀鎖不可以獲取寫鎖。
  3. 鎖可以降級,從寫鎖降級為讀鎖,但是不可能從讀鎖升級到寫鎖。

基本使用

class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        // 讀鎖加鎖
        rwl.readLock().lock();
        if (!cacheValid) {
            // 獲取寫鎖之前必須釋放讀鎖
            rwl.readLock().unlock();
            // 寫鎖加鎖
            rwl.writeLock().lock();
            try {
                // 重新檢查狀態,因為另一個執行緒可能
                // 在執行操作之前獲取了寫鎖定並更改了狀態
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // 通過在釋放寫鎖之前獲取讀鎖來降級
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }

        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

上面只是官方文檔提供的一個 demo。

問題疑問

  1. 在 ReentrantReadWriteLock 中 state 代表什麼?
  2. 執行緒獲取鎖的流程是怎麼樣的?
  3. 讀鎖和寫鎖的可重入性是如何實現的?
  4. 當前執行緒獲取鎖失敗,被阻塞的後續操作是什麼?
  5. 鎖降級是怎麼降級的?

源碼分析

程式碼結構

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 提供讀鎖的內部類 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 提供寫鎖的內部類 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 執行所有同步機制 */
    final Sync sync;

}

state

之前在閱讀 ReentrantLock 源碼的時候 state 代表了鎖的狀態,0 表示沒有執行緒持有鎖,大於 1 表示已經有執行緒持有鎖及其重入的次數。而在 ReentrantReadWriteLock 是讀寫鎖,那就需要保存讀鎖寫鎖兩種狀態的,那是怎麼樣表示的呢?

在 ReentrantReadWriteLock 中同樣存在一個 Sync 繼承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父類。內部定義了 state 的一些操作。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
    // 移位數
    static final int SHARED_SHIFT   = 16;
    // 單位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大數量 1 << 16 -> 65536
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 計算獨佔數使用 1 << 16 -> 65536
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 返回共享保留數
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 返回獨佔保留數 
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

}

在 AQS 中定義 state 為 int 類型,而在 ReentrantReadWriteLock 中,將 state 的 高 16 位和低 16 位拆開表示讀寫鎖。其中高 16 位表示讀鎖,低 16 位表示寫鎖。分別使用 sharedCount 和 exclusiveCount 方法獲取讀鎖和寫鎖的當前狀態。

下面分別從讀鎖和寫鎖的角度來看如何進行加鎖和釋放鎖的?

ReadLock.lock


public static class ReadLock 
    implements Lock, java.io.Serializable {
    /**
     * 獲取讀取鎖。
     * 如果寫鎖沒有被另一個執行緒持有,則獲取讀鎖並立即返回。
     * 如果寫鎖由另一個執行緒持有,則出於執行緒調度目的,
     * 當前執行緒將被禁用,並處於休眠狀態,直到獲取讀鎖為止。
     */
    public void lock() {
        // 調用 AQS 獲取共享資源
        sync.acquireShared(1);
    }
}

ReentrantReadWriteLock-AQS-Share-gTrD2e

獲取共享資源,這塊使用的 AQS 的邏輯,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中實現的。並且 AQS 中有規定,tryAcquireShared 分為三種返回值:

  1. 小於 0: 表示失敗;
  2. 等於 0: 表示共享模式獲取資源成功,但後續的節點不能以共享模式獲取成功;
  3. 大於 0: 表示共享模式獲取資源成功,後續節點在共享模式獲取也可能會成功,在這種情況下,後續等待執行緒必須檢查可用性。
abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final int tryAcquireShared(int unused) {
     
        Thread current = Thread.currentThread();
        // 獲取 state 值
        int c = getState();
        // 獨佔計數不為 0 且 不是當前執行緒, 說明已經有寫鎖
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1;
        // 獲取共享計數(讀鎖計數)
        int r = sharedCount(c);
        // 不需要阻塞讀鎖 && 共享計數小於最大值 && state 更新成功
        if (!readerShouldBlock() && r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                // 當前讀鎖計數為 0
                // firstReader是獲得讀鎖的第一個執行緒
                // firstReaderHoldCount是firstReader的保持計數
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 讀鎖重入
                firstReaderHoldCount++;
            } else {
                // 當前快取計數
                HoldCounter rh = cachedHoldCounter;
                // 當前執行緒沒有計數 或者 沒有創建計數器
                if (rh == null || rh.tid != getThreadId(current))
                    // 創建計數,基於 ThreadLocal
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) 
                    readHolds.set(rh);
                // 計數累加
                rh.count++;
            }
            return 1;
        }
        // 完整地獲取共享鎖方法,作為tryAcquireShared方法因CAS獲取鎖失敗後的處理。
        // 因為前面可能失敗 CAS 失敗, 隊列策略失敗等原因。
        return fullTryAcquireShared(current);
    }
}
  1. 先獲取 state ,通過 exclusiveCount 方法獲取到寫鎖的計數值,不為 0 且 不是當前執行緒, 說明已經有寫鎖。返回 -1 失敗。
  2. 通過 sharedCount 獲取讀鎖計數,判斷是否需要阻塞以及是否超過上限後,使用 CAS 更新 讀鎖計數。
  3. 設置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。
  4. 最後會進行完整的獲取共享鎖方法,作為之前獲取失敗的後續處理方法。

firstReader:firstReader是獲得讀鎖的第一個執行緒;
firstReaderHoldCount:firstReaderHoldCount是firstReader的保持計數。即獲得讀鎖的第一個執行緒的重入次數。
cachedHoldCounter:最後一個獲得讀鎖的執行緒獲得讀鎖的重入次數。

final int fullTryAcquireShared(Thread current) {
 
    HoldCounter rh = null;
    // 無限循環
    for (;;) {
        int c = getState();
        // 是否有寫鎖
        if (exclusiveCount(c) != 0) {
            // 有寫鎖,但是不是當前執行緒,直接返回失敗
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 需要阻塞
            // 沒有寫鎖,確保沒有重新獲取讀鎖
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 當前執行緒的讀鎖計數 ThreadLocal 中
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        // 計數結束,remove 掉
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 為 0 直接失敗
                if (rh.count == 0)
                    return -1;
            }
        }
        // 到達上限 拋出異常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS 設置讀鎖
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
  1. 首先會一直循環
  2. 有寫鎖,但是不是當前執行緒,直接返回失敗。但是,有寫鎖,如果是當前執行緒,是會繼續執行的。
  3. 設置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。

當存在寫鎖(獨佔鎖)時,方法會返回 -1 失敗,後續會調用 AQS 的 doAcquireShared 方法,循環獲取資源。doAcquireShared 方法會不斷循環,嘗試獲取讀鎖,一旦獲取到讀鎖,當前節點會立即喚醒後續節點,後續節點開始嘗試獲取讀鎖,依次傳播。

ReentrantReadWriteLock-1-rl0DjC

ReadLock.unlock

public static class ReadLock 
    implements Lock, java.io.Serializable {
    public void unlock() {
        sync.releaseShared(1);
    }
}

調用 AQS 的 releaseShared 釋放共享資源方法。

ReadLock-unlock-LE7vUH

其中 tryReleaseShared 有 ReadLock 實現。

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 第一個執行緒是當前執行緒
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 第一個執行緒不是當前執行緒,更新自己的 ThreadLocal 裡面的計數
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 循環
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // 使用 CAS 更新 state
        if (compareAndSetState(c, nextc))
            // 但是如果現在讀和寫鎖都已釋放,
            // 它可能允許等待的寫程式繼續進行。
            return nextc == 0;
    }
}
  1. 如果是第一個執行緒,直接更新技術,不是則更新自己 ThreadLocal 裡面保存的計數。
  2. 循環,使用 CAS 更新 state 的值。
  3. 如果 state 更新後的值為 0,說明沒有執行緒持有讀鎖或者寫鎖了。
  4. 當 state 為 0,此時會調用 AQS 的 doReleaseShared 方法。此時隊列如果有寫鎖,那就會被寫鎖獲取的鎖。

WriteLock.lock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    /**
     * 獲取寫入鎖。
     * 如果沒有其他執行緒持有讀鎖或寫鎖,會直接返回,並將寫鎖計數設置為1。
     * 如果當前執行緒持有寫鎖,則將寫鎖計數 +1,然後返回。
     * 如果鎖正在被其他執行緒持有,則當前執行緒用於執行緒調度目的,
     * 當前執行緒將被禁用,並處於休眠狀態,直到獲取讀鎖並將寫鎖計數設置為1。
     */
    public void lock() {
        sync.acquire(1);
    }
}

WriteLock.lock-wBuvUA

tryAcquire 方法由 Write 自己實現,方式和 ReentrantLock 類似。

protected final boolean tryAcquire(int acquires) {
    
    // 如果讀鎖計數為非零或寫鎖計數為非零,並且所有者是另一個執行緒,則失敗。
    // 如果計數飽和,則失敗。只有在count不為零時,才可能發生這種情況。
    // 否則,如果該執行緒是可重入獲取或隊列策略允許的話,則有資格進行鎖定。
    // 如果是這樣,請更新狀態並設置所有者。
    Thread current = Thread.currentThread();
    int c = getState();
    // 寫鎖計數
    int w = exclusiveCount(c);
    // c != 0 說明有有執行緒獲取鎖了
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 判斷是不是自己,不是自己 返回 false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 判斷有沒有超過上限
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 重入
        setState(c + acquires);
        return true;
    }
    // 不需要阻塞,或者 CAS 更新 state 失敗
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  1. 獲取 state , 如果 state 不為 0 則判斷是否為當前執行緒重入獲取。
  2. state 為 0 ,則當前執行緒 CAS 更新 state,獲取鎖。
  3. 更新成功之後綁定當前執行緒。
  4. 如果失敗會繼續調用 AQS 的 acquireQueued,將當前阻塞放在 AQS 隊列中。AQS 會不斷循環,等待上一個鎖釋放後,嘗試獲得鎖。

ReentrantReadWriteLock-2-mQAgGL

WriteLock.unlock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    // 如果當前執行緒是此鎖的持有者,則保持計數遞減。 
    // 如果保持現在的計數為零,則解除鎖定。 
    // 如果當前執行緒不是此鎖的持有者則IllegalMonitorStateException異常。
    public void unlock() {
        sync.release(1);
    }
}

Write-unlock-bwHAcw

同樣這塊程式碼是使用 AQS 的邏輯,tryRelease 部分由 WriteLock 自己實現。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  1. 如果是當前執行緒重入,扣減重入次數。
  2. 扣減後如果為 0,則設置鎖持有執行緒為 null,更新 state 值。AQS 會喚醒後續節點獲取鎖。

總結

問題

Q:在 ReentrantReadWriteLock 中 state 代表什麼?

A:state 代表鎖的狀態。state 為 0 ,沒有執行緒持有鎖,state 的高 16 為代表讀鎖狀態,低 16 為代表寫鎖狀態。通過位運算可以獲取讀寫鎖的實際值。

Q:執行緒獲取鎖的流程是怎麼樣的?

A:可以參考上面的源碼筆記,以及後面的流程圖。

Q:讀鎖和寫鎖的可重入性是如何實現的?

A:在加鎖的時候,判斷是否為當前執行緒,如果是當前執行緒,則直接累加計數。值得注意的是:讀鎖重入計數使用的 ThreadLocal 在執行緒中快取計數,而寫鎖則直接用的 state 進行累加(其實和 state 低 16 位進行累加一樣)。

Q:當前執行緒獲取鎖失敗,被阻塞的後續操作是什麼?

A:獲取失敗,會放到 AQS 等待隊列中,在隊列中不斷循環,監視前一個節點是否為 head ,是的話,會重新嘗試獲取鎖。

Q:鎖降級是怎麼降級的?

A: write-to-read-koAuqm
如圖,在圈出部分 fullTryAcquireShared 程式碼中,可以看出來,在獲取讀鎖的時候,如果當前執行緒持有寫鎖,是可以獲取讀鎖的。這塊就是指鎖降級,比如執行緒 A 獲取到了寫鎖,當執行緒 A 執行完畢時,它需要獲取當前數據,假設不支援鎖降級,就會導致 A 釋放寫鎖,然後再次請求讀鎖。而在這中間是有可能被其他阻塞的執行緒獲取到寫鎖的。從而導致執行緒 A 在一次執行過程中數據不一致。

小結

  1. ReentrantReadWriteLock 讀寫鎖,內部實現是 ReadLock 讀鎖 和 WriteLock 寫鎖。讀鎖,允許共享;寫鎖,是獨佔鎖。
  2. 讀寫鎖都支援重入,讀鎖的重入次數記錄在執行緒維護的 ThreadLocal 中,寫鎖維護在 state 上(低 16 位)。
  3. 支援鎖降級,從寫鎖降級為讀鎖,防止臟讀。
  4. ReadLock 和 WriteLock 都是通過 AQS 來實現的。獲取鎖失敗後會放到 AQS 等待隊列中,後續不斷嘗試獲取鎖。區別在讀鎖只有存在寫鎖的時候才放到等待隊列,而寫鎖是只要存在非當前執行緒鎖(無論寫鎖還是讀鎖)都會放到等待隊列。!read-write-different-oI9wB1
  5. 通過源碼分析,可以得出讀寫鎖適合在讀多寫少的場景中使用。

相關資料

[1] Java Api://docs.oracle.com/javase/8/docs/api/overview-summary.html