ReentrantReadWriteLock 源碼分析以及 AQS 共享鎖 (二)
- 2020 年 3 月 17 日
- 筆記
前言
上一篇講解了 AQS 的獨佔鎖部分(參看:ReentrantLock 源碼分析以及 AQS (一)),這一篇將介紹 AQS 的共享鎖,以及基於共享鎖實現讀寫鎖分離的 ReentrantReadWriteLock。(若是遇到之前講過的方法,將不再贅述)
先思考一下,為什麼我們用讀寫鎖分離?
我們知道 ReentrantLock 用的是獨佔鎖,不管線程是讀還是寫狀態,都會阻塞,這無疑會降低並發量。
但是,我們知道多個線程同時去讀數據的時候,並不會產生線程安全的問題,因為它們互不干擾。那麼為什麼不設計一種方案,讓所有的讀線程可以共享,一起同時讀數據呢,只需要阻塞寫的線程就可以了。提高並發的同時,也不會產生數據不一致的現象。
同樣的,如果有線程在寫數據,那麼也會阻塞其它讀線程(同樣阻塞其它寫線程),數據寫完之後才可以讀數據,這樣保證讀到的數據都是最新的。
因此,我們可以用讀、寫兩把鎖,分別控制數據的讀和寫。實現讀讀共享、讀寫互斥,寫寫互斥。這也是 ReentrantReadWriteLock 讀寫分離鎖的由來。它非常適合用在讀多寫少的場景。
ReentrantReadWriteLock
它和 ReentrantLock 一樣,也是一個可重入的鎖,並基於 AQS 共享鎖實現了讀寫分離。其內部結構也大同小異,支持公平鎖和非公平鎖。我們看下它的構造函數,
public ReentrantReadWriteLock() { //默認非公平 this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
它定義了兩個內部類來表示讀鎖和寫鎖,並且都通過內部類 Sync 來實現加鎖,釋放鎖等功能。
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ... } public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ... } abstract static class Sync extends AbstractQueuedSynchronizer { }
我們再看下公平鎖和非公平鎖,其中有兩個比較重要的方法,用來判斷讀鎖和寫鎖是否應該被阻塞,後面加鎖的時候會用到(其實,實際情況是否真的應該阻塞,還需要斟酌,後面會說)。
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; //公平鎖的讀和寫都需要判斷,在它前面是否已經有線程在等待。 //有的話,當前線程就需要阻塞,這也體現了公平性。 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; //非公平鎖,寫的時候不需要阻塞,直接返回false final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { //為了避免寫線程飢餓,需要判斷同步隊列中第一個排隊的(head.next)是否是獨佔鎖(寫線程) //如果是的話,當前讀線程就需要阻塞,這是 AQS 中的方法 return apparentlyFirstQueuedIsExclusive(); } } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
思考:
我們知道 ReentrantLock 的同步狀態和重入次數,是直接用 state 值來表示的。那麼,現在我需要讀和寫兩把鎖,怎麼才能用一個 int 類型的值來表示兩把鎖的狀態呢?並且,鎖是可重入的,重入的次數怎麼記錄呢?
別急,下面一個一個說。
怎麼用一個 state 值表示讀、寫兩把鎖?
state 是一個 32 位的 int 值,讀寫鎖中,把它一分為二,高 16 位用來表示讀狀態,其值代表讀鎖的線程數,如圖中為 3 個,低 16位表示寫狀態,其值代表寫鎖的重入次數(因為是獨佔鎖)。 這樣,就可以分別計算讀鎖和寫鎖的個數了。其相關的屬性和方法定義在 Sync 類中。
static final int SHARED_SHIFT = 16; //表明讀鎖每增加一個,state的實際值增加 2^16 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //寫鎖的最大重入次數,讀鎖的最大個數 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //持有讀鎖的線程個數,參數如的 c 代表 state值 //state 的32位二進制位,無符號右移 16位之後,其實就是高16位的值 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //寫鎖數量,即寫鎖的重入次數 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
讀鎖的個數計算比較簡單,直接無符號右移 16 位即可。我們看下寫鎖的重入次數是怎麼計算的。先看下 EXCLUSIVE_MASK 這個值,是 (1 << 16) – 1,我們用二進制表示計算過程為:
// 1的二進制 0000 0000 0000 0000 0000 0000 0000 0001 // 1左移 16位 0000 0000 0000 0001 0000 0000 0000 0000 //再減 1 0000 0000 0000 0000 1111 1111 1111 1111 //任何一個 32位二進制數 c,和以上值做 「與」 運算都為它本身 c 的低 16 位值 //這個不用解釋了吧,這個不會的話,需要好好補充一下基礎知識了。。。
鎖的重入次數是怎麼計算的?
寫鎖比較簡單,直接用計算出來的低16位值就可以代表寫鎖的重入次數。
讀鎖,就比較複雜了,因為高16位只能表示持有共享鎖的線程個數,實在是分身乏術啊。所以,在 Sync 內部,維護了一個類,用來表示每個線程重入的次數,
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }
這裡邊定義了一個計數器來表示重入次數,tid 來表示當前的線程 id 。但是,這樣還不夠,我們需要把 HoldCounter 和 線程綁定,這樣才可以區分出來每個線程分別持有的鎖個數(重入次數),這就需要用到 ThreadLocal 了。
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { //重寫此方法,可以在 ThreadLocal 沒有當前線程計數的情況下, //直接使用 的 get 方法,初始化一個,而不必 new 一個對象 public HoldCounter initialValue() { return new HoldCounter(); } }
除此之外,Sync 中還定義了一些其他和讀鎖相關的屬性,
//保存了當前線程重入的讀鎖次數,當重入次數減到 0 時移除 //移除應該是為了性能着想,因為可以隨時通過 get 方法初始化 HoldCounter private transient ThreadLocalHoldCounter readHolds; //保存了最近一個獲取讀鎖成功的線程計數,這個變量的目的是: //如果最後一個獲取到讀鎖的線程重複獲取讀鎖,那麼就可以直接拿來用,而不用更新。 //相當於緩存,提高效率 private transient HoldCounter cachedHoldCounter; //第一個獲取讀鎖的線程 private transient Thread firstReader = null; //第一個獲取讀鎖的線程計數 private transient int firstReaderHoldCount; //這兩個參數,是為了效率問題,當只有一個線程獲得讀鎖時,就避免了查找 readHolds
基本知識講完啦,那麼接下來就是鎖的獲取和釋放了。先說下寫鎖吧,因為有上一篇獨佔鎖的基礎了,理解起來比較容易。
寫鎖的獲取
寫鎖的獲取從 lock 方法開始,
//ReentrantReadWriteLock.WriteLock.lock public void lock() { sync.acquire(1); } //AbstractQueuedSynchronizer.acquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //公平鎖和非公平鎖調用的是同一個方法,在 Sync 類中定義 //ReentrantReadWriteLock.Sync.tryAcquire protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //獲取同步狀態 state int c = getState(); //寫鎖狀態 int w = exclusiveCount(c); //如果同步狀態不為 0,說明有線程獲得了讀鎖或寫鎖 if (c != 0) { //如果同步狀態不為 0 ,並且寫鎖狀態為 0,說明了讀鎖被佔用,因讀寫鎖互斥,故返回 false //若寫鎖狀態不為 0,並且不是當前線程獲得了寫鎖,則不能重入,返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; //如果超過了最大寫鎖數量,則拋出異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //若走到這一步,說明當前線程重入了,則計算重入次數,返回true setState(c + acquires); return true; } //到這說明 c 為 0,讀鎖和寫鎖都沒有被佔用 //如果寫鎖應該被阻塞或者 CAS 獲取寫鎖失敗,則返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //把當前線程設為獨佔鎖的所有者 setExclusiveOwnerThread(current); return true; }
寫鎖的釋放
同理,寫鎖的釋放從 unlock 方法開始,
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //若獨佔鎖的持有者不是當前線程,則拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //每次釋放,state 減 1 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
可以看到,寫鎖的獲取和釋放和 ReentrantLock 的基本思想是差不多的。下面,着重講解讀鎖的獲取和釋放,相對比較複雜。
讀鎖的獲取
tryAcquireShared
從 ReadLock.lock 方法開始,
public void lock() { //調用 AQS 的方法 sync.acquireShared(1); } public final void acquireShared(int arg) { //如果 tryAcquireShared 方法返回小於 0,說明獲取讀鎖失敗 if (tryAcquireShared(arg) < 0) //以共享模式加入同步隊列,再自旋搶鎖 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //如果有線程獲取到了寫鎖,並且不是當前線程,則返回 -1 。 //這是因為,如果線程先獲得了寫鎖,是可以重入再次獲取讀鎖的,此為鎖降級。 //否則不可重入。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //讀鎖數量 int r = sharedCount(c); //如果同時滿足以下三個條件(讀線程不應該被阻塞,讀鎖數量小於最大數量限制,CAS成功), //則說明獲取讀鎖成功,返回 1。然後再設置相關屬性的值。 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //如果讀鎖狀態為 0,說明還沒有其他線程獲取到讀鎖 if (r == 0) { //就把當前線程設置為第一個獲取到讀鎖的線程 firstReader = current; //第一個讀線程計數設置為 1 firstReaderHoldCount = 1; } else if (firstReader == current) { //如果當前線程是第一個獲取讀鎖的線程,則重入,計數加 1 firstReaderHoldCount++; } else { //讀鎖狀態不為 0,並且當前線程不是 firstReader //最近一個成功獲取到讀鎖的線程計數器 HoldCounter rh = cachedHoldCounter; //如果計數器為空,或者計數器的 tid不是當前線程 id,說明有兩種情況 //1.rh 還未被任何線程設置,此時只有 firstReader 一個線程獲取到了讀鎖。 //2.rh 已經被設置了,並且不是當前線程,說明在當前線程之前,除了 firstReader, //還有其他線程獲取到了讀鎖,那麼當前線程就是第三個獲取到讀鎖的(至少)。 if (rh == null || rh.tid != getThreadId(current)) //不管哪種情況,都需要創建並初始化當前線程的計數器,並賦值給 cachedHoldCounter //因為,當前線程是此時最後一個獲取到讀鎖的線程,需要緩存下來 cachedHoldCounter = rh = readHolds.get(); //如果當前線程是最近一個獲取到讀鎖的線程,並且計數為0, else if (rh.count == 0) //就把 rh 線程持有鎖的次數信息,放入到本地線程 readHolds readHolds.set(rh); //最後把計數加 1 rh.count++; } return 1; } //若以上三個條件任意一個不滿足,則調用此方法,再次全力嘗試獲取鎖 return fullTryAcquireShared(current); }
fullTryAcquireShared 這個方法和 tryAcquireShared 方法非常相似,只是多了一個自旋的過程,直到返回一個確定值(-1或1),才結束。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; //自旋,直到返回一個確定值(1或 -1) for (;;) { int c = getState(); //如果寫鎖狀態不為0,說明有線程獲取到了寫鎖 if (exclusiveCount(c) != 0) { //獲取到寫鎖的線程不是當前線程,則返回 -1 if (getExclusiveOwnerThread() != current) return -1; //這裡省略了else,到這裡說明了當前線程獲取到了寫鎖,因此需要做鎖降級處理, //把寫鎖降級為讀鎖。因為如果不這樣做的話,線程就會阻塞到這,會導致死鎖。 //然後跳轉到 ①處繼續執行 //===========// } else if (readerShouldBlock()) { //寫鎖空閑,並且讀鎖應該阻塞,說明 head.next正在等待獲取寫鎖 //儘管讀鎖應該阻塞,但是此處也不應該立即阻塞,因為有可能存在讀鎖重入,需要再確認一下。 if (firstReader == current) {//當前線程是第一個讀鎖,可重入 // 將跳轉到 ①處 } else { if (rh == null) { //第一次循環進來時肯定為 null rh = cachedHoldCounter; //取到緩存中最後一次獲取到讀鎖的計數器 if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); //計數為 0,說明當前線程沒有獲取到過讀鎖 if (rh.count == 0) //為了性能考慮,如果計數為 0,需要把它移除掉 readHolds.remove(); } } //走到這,說明當前線程不是 firstReader,也沒有獲取到過讀鎖,不符合重入條件, //那麼就確定需要阻塞,只能去排隊了,返回 -1 。 if (rh.count == 0) return -1; } } // ①處 //如果讀鎖數量達到了 MAX_COUNT,則拋出異常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS獲取讀鎖,和 tryAcquireShared 的處理邏輯一樣,不再贅述 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
doAcquireShared
如果 tryAcquireShared 最終還是失敗了,那麼就執行 doAcquireShared 方法。
private void doAcquireShared(int arg) { //以共享模式加入同步隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //如果當前節點的前驅節點是頭結點,再次嘗試獲取讀鎖 int r = tryAcquireShared(arg); if (r >= 0) { //把當前節點設置為頭結點,並把共享狀態傳播下去 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //獲取讀鎖失敗,判斷是否可掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { //舊的頭結點 Node h = head; //把當前 node 設置為新的頭結點 setHead(node); //propagate 是 tryAcquireShared 方法的返回值 //若大於0,或者舊的頭結點為空,或者頭結點的 ws 小於0 //又或者新的頭結點為空,或者新頭結點的 ws 小於0,則獲取後繼節點 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //沒有後繼節點或者後繼節點是共享節點,就執行喚醒 if (s == null || s.isShared()) //釋放掉資源,並喚醒後繼節點,稍後講解 doReleaseShared(); } }
讀鎖的釋放
tryReleaseShared
從 ReadLock.unlock方法開始,
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //當前線程為第一個讀線程 if (firstReader == current) { //若 firstReader 的計數為1,則把它置為 null if (firstReaderHoldCount == 1) firstReader = null; else //否則,計數減 1,說明重入次數減 1 firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { //如果當前線程的計數小於等於 1,則移除 readHolds.remove(); if (count <= 0) //若計數小於等於 0,則拋出異常 throw unmatchedUnlockException(); } //計數減 1 --rh.count; } for (;;) { int c = getState(); //讀鎖狀態減 1,其實就是state值減 65536 //因為高16位的讀鎖實際值,在state中的表現就是相差 65536 int nextc = c - SHARED_UNIT; // CAS 設置 state 最新狀態 if (compareAndSetState(c, nextc)) //如果讀鎖狀態減為 0,就返回true //釋放讀鎖對其它讀線程沒有任何影響, //但是如果讀、寫鎖都空閑,就可以允許等待的寫線程繼續執行 return nextc == 0; } }
doReleaseShared
如果 tryReleaseShared 方法返回 true,說明讀鎖釋放成功,需要喚醒後繼節點,
private void doReleaseShared() { for (;;) { //頭結點 Node h = head; //說明隊列中至少有兩個節點 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果頭結點的 ws 為 -1 ,則 CAS 把它設置為 0,因為喚醒後繼節點後, //它就不需要做什麼了。失敗繼續自旋嘗試 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // CAS 成功,則喚醒後繼節點 unparkSuccessor(h); } //如果 ws 為 0,則把它設置為 -3 ,表明共享狀態可向後傳播,失敗則繼續自旋嘗試 //後來我一直在想,為什麼需要設置一個 PROPAGATE 這樣的狀態呢,但是還沒頭緒 //可以看下這篇文章分析,或許有一定的參考價值: //https://www.cnblogs.com/micrari/p/6937995.html //只能說 Doug Lea 大神的邏輯真是太縝密了,等我以後想明白了,再補充吧。 //可以暫時先理解為,這就是一個無條件傳播的標誌 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果此刻 h 等於頭結點,說明頭結點未改變,則跳出整個循環 //否則,說明頭結點被其他線程修改過了,則繼續下一次的循環判斷 if (h == head) // loop if head changed break; } }
結語
關於獨佔鎖,比較簡單。而讀鎖,涉及到了很多臨界點和瞬時狀態。其實細想,並不像表面上看起來那麼簡單,理解的會比較淺顯,畢竟 Doug Lea 大神的思想不是常人能揣摩透的。
本篇只是我的一些個人理解,如有講解不到位的地方,歡迎拍磚。
其實,還有很多細節問題,本文並沒有展開。例如, setHeadAndPropagate 方法為什麼判斷兩次新舊節點的 ws 狀態,意義何為。 doReleaseShared 方法最後為什麼需要設計 h == head 這樣的判斷,有什麼含義。包括為什麼要設計 PROPAGATE 狀態,沒有這個狀態又如何。
看來路阻且長啊。。。以後再來補坑吧,這篇只能叫淺析了。 ̄□ ̄||
如果本文對你有用,歡迎點贊,評論,轉發。
學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。