JAVA並發(3)-ReentrantReadWriteLock的探索

1. 介紹

本文我們繼續探究使用AQS的子類ReentrantReadWriteLock(讀寫鎖)。老規矩,先貼一下類圖
ReentrantReadWriteLock結構圖
ReentrantReadWriteLock這個類包含讀鎖和寫鎖,這兩種鎖都存在是否公平的概念,這個後面會細講。

此類跟ReentrantLock類似,有以下幾種性質:

  • 可選的公平性政策
  • 重入,讀鎖和寫鎖同一個線程可以重複獲取寫鎖可以獲取讀鎖,反之不能
  • 鎖的降級,重入還可以通過獲取寫鎖,然後獲取到讀鎖,通過釋放寫鎖的方式,從而寫鎖降級為讀鎖。 然而,從讀鎖升級到寫鎖是不可能的。
  • 獲取讀寫鎖期間,支持不可中斷

2. 源碼剖析

先講幾個必要的知識點,然後我們再對寫鎖的獲取與釋放讀鎖的獲取與釋放進行講解,中間穿插着講公平與非公平的實現。

知識點一:
內部類Sync中,將AQS中的state(private volatile int state;長度是32位)邏輯分成了兩份,高16位代表讀鎖持有的count,低16位代表寫鎖持有的count

Sync

    ...
        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
   ...

This lock supports a maximum of 65535 recursive write locks and 65535 read locks. Attempts to exceed these limits result in Error throws from locking methods.(讀鎖與寫鎖都最大支持65535個)

知識點二
HoldCounter的作用,一個計數器記錄每個線程持有的讀鎖count。使用ThreadLocal維護。緩存在cachedHoldCounter

        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

使用ThreadLocal維護 👇

        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

維護最後一個使用HoldCounter的線程。簡言之就是,假如A線程持有讀鎖,A線程重入獲取讀鎖,在它之後沒有其他線程獲取讀鎖,那麼當獲取HoldCounter時,可以直接將cachedHoldCounter賦值給該線程,就不用從ThreadLocal中去查詢了(ThreadLocal內部維持一個 Map ,想獲取當前線程的值就需要去遍歷查詢),這樣做可以節約時間。

        private transient HoldCounter cachedHoldCounter;

當前線程持有的重入讀鎖count,當某個線程持有的count降至0,將被刪除。

        private transient ThreadLocalHoldCounter readHolds;

初始化在構造函數或readObject

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // reset to unlocked state
        }

知識點三:
是否互斥 :

讀操作 寫操作
讀操作
寫操作

只有讀讀不互斥,其餘都互斥
獲取到了寫鎖,也有資格獲取讀鎖,反之不行.

知識點四
ReentranReadWriteLock,實現了ReadWriteLock接口。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReadWriteLock維護着寫鎖讀鎖寫鎖是排他的,而讀鎖可以同時由多個線程持有。與互斥鎖相比,讀寫鎖的粒度更細

有了上面的知識,等會理解下面的源碼就更容易了,故事從下面幾個變量開始~

    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock readLock = rwl.readLock();
    private final Lock writeLock = rwl.writeLock();

ReentrantReadWriteLock的構造器,默認是非公平模式


    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * default (nonfair) ordering properties.
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

2.1 寫鎖

寫鎖,排他鎖;一個線程獲取了寫鎖,其他線程只能等待

2.1.1 寫鎖的獲取

    writeLock.lock();

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock

public void lock() {
            sync.acquire(1);
        }

又來到了AQSacquire方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我們看ReentrantReadWriteLock是如何實現tryAcquire

 protected final boolean tryAcquire(int acquires) {

            /*
            * 對下面的條件做一個總述:
            * 1. 當讀鎖或者寫鎖的count不為零時,同時擁有者不是當前線程,返回false
            * 2. 當count達到飽和(超過最大的限制65535),返回false
            * 3. 如果上面的情況都不是,該線程有資格去獲取鎖,如果它是可重入獲取鎖或隊列策略允許它。那麼就更新state並且設置鎖的擁有者
            */
            // 結合總述看下面的代碼,很容易就看懂了

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);

            // 存在讀鎖或寫鎖
            if (c != 0) {
                // 情況分析: 1. w = 0, 表示已經獲取了讀鎖(不管是自己還是其他線程),直接返回false
                // 2. 寫鎖不為零,且當前鎖的擁有者不是當前線程,返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }

            // 不存在讀鎖或寫鎖被佔用的情況
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

上面代碼中的writerShouldBlock方法就是tryAcquire控制公平與否的關鍵,我們分別看看公平與非公平是如何實現的

2.1.1.1 寫鎖的獲取(非公平)

默認情況下是非公平的

 static final class NonfairSync extends Sync {
        ...
        // 直接返回false
        final boolean writerShouldBlock() {
            return false;
        }
        ...

2.1.1.2 寫鎖的獲取(公平)

    static final class FairSync extends Sync {
        ...
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        ...

判斷是否該線程前面還有其他線程的結點,上一節有講到過。

這裡還貼一下,整個acquire的流程圖
acquire的流程圖

2.1.2 寫鎖的釋放

下面的這段代碼,記得一定放在finally中

    writeLock.unlock();
        public void unlock() {
            sync.release(1);
        }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

又看到了熟悉的面孔,但我們主要看的還是tryRelease, 👇

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

很簡單,就貼一下release的流程圖
release流程圖

2.2 讀鎖

讀鎖與讀鎖並不互斥,可以存在多個持有讀鎖的線程📕

2.2.1 讀鎖的獲取

        readLock.lock();
        public void lock() {
            sync.acquireShared(1);
        }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

之前的文章還沒有講解過tryAcquireShared在子類如何實現的。看看如何實現的 👇

  protected final int tryAcquireShared(int unused) {

            /*
             * 對下面的條件做一個總述:
             * 1. 如果寫鎖被其他線程持有,失敗
             * 2. 否則,如果此線程有資格去獲取寫鎖,首先查詢是否需要阻塞(readerShouldBlock).如果不需要,就通過CAS更新state。
             *
             * 3. 如果上面兩步都失敗了,可能是當前線程沒有資格(需要被阻塞),或CAS失敗,或者count數量飽和了,然後執行fullTryAcquireShared進行重試
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 這裡可以看出,若該線程持有寫鎖,同樣也可以去獲取讀鎖
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);

            // readerShouldBlock表示此線程是否被阻塞(後面細談)
            // 多個線程執行compareAndSetState(c, c + SHARED_UNIT), 只有一個線程可以執行成功,其餘線程去執行fullTryAcquireShared
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    // firstReader是第一個獲取到讀鎖的線程
                    // firstReaderHoldCount是firstReader持有的count
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 重入
                    firstReaderHoldCount++;
                } else {
                    // sync queue還存在其他線程持有讀鎖,且該線程不是第一個持有讀鎖的線程
                    // 結合上面的知識點二,理解下面的邏輯
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 到達這裡的線程,1. 沒有資格獲取(需要阻塞) 2. CAS失敗 3. 讀鎖count飽和
            return fullTryAcquireShared(current);
        }

Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.


        final int fullTryAcquireShared(Thread current) {

             // 主要邏輯跟上面的tryAcquireShared類似。tryAcquireShared的邏輯只有一個線程會成功CAS,
             // 其餘的線程都進入fullTryAcquireShared,進行重試(代碼不那麼複雜)
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) { // 需要被阻塞
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        // 此時, 如果該線程前面已經有線程獲取了讀鎖,且當前線程持有的讀鎖count為0,從readHolds除去。
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        // 這裡表示該線程還是一個new reader, 還沒有持有讀鎖
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 這裡只有一個線程會成功,若CAS失敗,一直重試直到成功,或者讀鎖count飽和,或者需要被阻塞為止
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

我們上面看到了readerShouldBlock,這個方法是控制讀鎖獲取公平與否,下面我們分別看看在非公平與公平模式下的實現 👇

2.2.1.1 讀鎖的獲取(非公平)

    static final class NonfairSync extends Sync {
        ...
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
        ...
    }

判斷sync queue的head的後繼結點是否是寫鎖(獨佔模式)

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

上面的方法是,獲取讀鎖時,避免導致寫鎖飢餓(indefinite writer starvation)的一個措施,下面我們對它進行詳細的解釋

寫鎖飢餓
結合上面的圖片,我們設想有一個這樣的情況,寫鎖沒有被獲取,線程A獲取到了讀鎖,此時另一個線程X想要獲取寫鎖,但是寫鎖與讀鎖互斥,所以此時將線程X代表的node添加到sync queue中,等待讀鎖被釋放,才有資格去獲取寫鎖

上面的情況 + 不存在判斷sync queue的head的後繼結點是否是寫鎖(apparentlyFirstQueuedIsExclusive)的方法時,我們看看會出什麼問題

時刻一: 線程B線程C線程D是新建線程想要去獲取讀鎖(new reader),此時因為不存在寫鎖被獲取,所以線程B線程C線程D都會在fullTryAcquireShared中不斷重試,最終都獲得讀鎖

時刻二: 線程A釋放,會執行unparkSuccessor,此時線程X被喚醒,但是執行到tryAcquire,又檢測到讀鎖被持有(不管是自己還是是其他線程)線程X又被阻塞。線程B釋放,還是會出現這種情況,只有等到最後一個讀鎖被釋放線程X才能獲取到寫鎖。但是想想如果後面一連串的讀鎖線程X不是早就被‘餓死了’

apparentlyFirstQueuedIsExclusive,就可以防止這種‘寫鎖飢餓’的情況發生。線程B線程C線程D只有被阻塞,等待線程X獲取到寫鎖,才有機會獲取讀鎖

2.2.1.2 讀鎖的獲取(公平)


    /**
     * Fair version of Sync
     */
    static final class FairSync extends Sync {
       ...

        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
       ...
    }

這裡關於讀鎖的獲取(公平與非公平)分析完了,貼一張整個acquireShared的流程圖
acquireShared流程圖

2.2.2 讀鎖的釋放

        readLock.unlock();
        public void unlock() {
            sync.releaseShared(1);
        }

AQS

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 喚醒head後驅結點
            doReleaseShared();
            return true;
        }
        return false;
    }
 // unused = 1
 protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                // 獲取讀鎖時,會執行 compareAndSetState(c, c + SHARED_UNIT),這裡就是減SHARED_UNIT
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // 釋放讀鎖,對需要讀鎖的線程沒有影響(讀讀不互斥)
                    // 但是,這裡如果返回true,可以喚醒被阻塞的想要持有寫鎖的線程
                    return nextc == 0;
            }
        }

3. 總結

  1. ReentrantReadWriteLock中的防止’寫鎖飢餓’的操作,值得一看
  2. AQS中的state(private volatile int state;),邏輯分為高16位(代表讀鎖的state),低16位(代表寫鎖的state),是一個值得學習的辦法
  3. 使用ThreadLocal維護每一個現成的讀鎖的重入數,使用cachedHoldCounter維護最後一個使用HoldCounter的讀鎖線程,節省在ThreadLocal中的查詢

使用讀寫鎖的情況,應該取決於與修改相比,讀取數據的頻率,讀取和寫入操作持續的時間。
例如:

  • 某個集合不經常修改,但是對其元素搜索很頻繁,使用讀寫鎖就是最佳選擇。(簡言之,就是讀多寫少)
  • 現在也是讀多寫少,但是讀操作時間很短,只有一小段代碼,而讀寫鎖比互斥鎖更加複雜,開銷可能大於互斥鎖,這種情況使用讀寫鎖可能不合適。此時就要通過性能分析,判斷使用讀寫鎖在系統中是否合適。

4. 參考