ReentrantReadWriteLock 源碼分析以及 AQS 共享鎖 (二)

  • 2020 年 3 月 17 日
  • 筆記

前言

上一篇講解了 AQS 的獨佔鎖部分(參看:ReentrantLock 源碼分析以及 AQS (一)),這一篇將介紹 AQS 的共享鎖,以及基於共享鎖實現讀寫鎖分離的 ReentrantReadWriteLock。(若是遇到之前講過的方法,將不再贅述)

先思考一下,為什麼我們用讀寫鎖分離?

我們知道 ReentrantLock 用的是獨佔鎖,不管線程是讀還是寫狀態,都會阻塞,這無疑會降低並發量。

但是,我們知道多個線程同時去讀數據的時候,並不會產生線程安全的問題,因為它們互不干擾。那麼為什麼不設計一種方案,讓所有的讀線程可以共享,一起同時讀數據呢,只需要阻塞寫的線程就可以了。提高並發的同時,也不會產生數據不一致的現象。

同樣的,如果有線程在寫數據,那麼也會阻塞其它讀線程(同樣阻塞其它寫線程),數據寫完之後才可以讀數據,這樣保證讀到的數據都是最新的。

因此,我們可以用讀、寫兩把鎖,分別控制數據的讀和寫。實現讀讀共享、讀寫互斥,寫寫互斥。這也是 ReentrantReadWriteLock 讀寫分離鎖的由來。它非常適合用在讀多寫少的場景。

ReentrantReadWriteLock

它和 ReentrantLock 一樣,也是一個可重入的鎖,並基於 AQS 共享鎖實現了讀寫分離。其內部結構也大同小異,支持公平鎖和非公平鎖。我們看下它的構造函數,

public ReentrantReadWriteLock() {      //默認非公平      this(false);  }    public ReentrantReadWriteLock(boolean fair) {      sync = fair ? new FairSync() : new NonfairSync();      readerLock = new ReadLock(this);      writerLock = new WriteLock(this);  }

它定義了兩個內部類來表示讀鎖和寫鎖,並且都通過內部類 Sync 來實現加鎖,釋放鎖等功能。

public static class ReadLock implements Lock, java.io.Serializable {      private static final long serialVersionUID = -5992448646407690164L;      private final Sync sync;        protected ReadLock(ReentrantReadWriteLock lock) {          sync = lock.sync;      }      ...  }    public static class WriteLock implements Lock, java.io.Serializable {      private static final long serialVersionUID = -4992448646407690164L;      private final Sync sync;        protected WriteLock(ReentrantReadWriteLock lock) {          sync = lock.sync;      }      ...  }    abstract static class Sync extends AbstractQueuedSynchronizer {  }

我們再看下公平鎖和非公平鎖,其中有兩個比較重要的方法,用來判斷讀鎖和寫鎖是否應該被阻塞,後面加鎖的時候會用到(其實,實際情況是否真的應該阻塞,還需要斟酌,後面會說)。

static final class FairSync extends Sync {      private static final long serialVersionUID = -2274990926593161451L;      //公平鎖的讀和寫都需要判斷,在它前面是否已經有線程在等待。      //有的話,當前線程就需要阻塞,這也體現了公平性。      final boolean writerShouldBlock() {          return hasQueuedPredecessors();      }      final boolean readerShouldBlock() {          return hasQueuedPredecessors();      }  }    static final class NonfairSync extends Sync {      private static final long serialVersionUID = -8159625535654395037L;      //非公平鎖,寫的時候不需要阻塞,直接返回false      final boolean writerShouldBlock() {          return false; // writers can always barge      }      final boolean readerShouldBlock() {          //為了避免寫線程飢餓,需要判斷同步隊列中第一個排隊的(head.next)是否是獨佔鎖(寫線程)          //如果是的話,當前讀線程就需要阻塞,這是 AQS 中的方法          return apparentlyFirstQueuedIsExclusive();      }  }    final boolean apparentlyFirstQueuedIsExclusive() {      Node h, s;      return (h = head) != null &&          (s = h.next)  != null &&          !s.isShared()         &&          s.thread != null;  }

思考:

我們知道 ReentrantLock 的同步狀態和重入次數,是直接用 state 值來表示的。那麼,現在我需要讀和寫兩把鎖,怎麼才能用一個 int 類型的值來表示兩把鎖的狀態呢?並且,鎖是可重入的,重入的次數怎麼記錄呢?

別急,下面一個一個說。

怎麼用一個 state 值表示讀、寫兩把鎖?

state 是一個 32 位的 int 值,讀寫鎖中,把它一分為二,高 16 位用來表示讀狀態,其值代表讀鎖的線程數,如圖中為 3 個,低 16位表示寫狀態,其值代表寫鎖的重入次數(因為是獨佔鎖)。 這樣,就可以分別計算讀鎖和寫鎖的個數了。其相關的屬性和方法定義在 Sync 類中。

static final int SHARED_SHIFT   = 16;  //表明讀鎖每增加一個,state的實際值增加 2^16  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  //寫鎖的最大重入次數,讀鎖的最大個數  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;    //持有讀鎖的線程個數,參數如的 c 代表 state值  //state 的32位二進制位,無符號右移 16位之後,其實就是高16位的值  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  //寫鎖數量,即寫鎖的重入次數  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

讀鎖的個數計算比較簡單,直接無符號右移 16 位即可。我們看下寫鎖的重入次數是怎麼計算的。先看下 EXCLUSIVE_MASK 這個值,是 (1 << 16) – 1,我們用二進制表示計算過程為:

// 1的二進制  0000 0000 0000 0000 0000 0000 0000 0001  // 1左移 16位  0000 0000 0000 0001 0000 0000 0000 0000  //再減 1  0000 0000 0000 0000 1111 1111 1111 1111  //任何一個 32位二進制數 c,和以上值做 「與」 運算都為它本身 c 的低 16 位值  //這個不用解釋了吧,這個不會的話,需要好好補充一下基礎知識了。。。

鎖的重入次數是怎麼計算的?

寫鎖比較簡單,直接用計算出來的低16位值就可以代表寫鎖的重入次數。

讀鎖,就比較複雜了,因為高16位只能表示持有共享鎖的線程個數,實在是分身乏術啊。所以,在 Sync 內部,維護了一個類,用來表示每個線程重入的次數,

static final class HoldCounter {      int count = 0;      // Use id, not reference, to avoid garbage retention      final long tid = getThreadId(Thread.currentThread());  }

這裡邊定義了一個計數器來表示重入次數,tid 來表示當前的線程 id 。但是,這樣還不夠,我們需要把 HoldCounter 和 線程綁定,這樣才可以區分出來每個線程分別持有的鎖個數(重入次數),這就需要用到 ThreadLocal 了。

static final class ThreadLocalHoldCounter      extends ThreadLocal<HoldCounter> {      //重寫此方法,可以在 ThreadLocal 沒有當前線程計數的情況下,      //直接使用 的 get 方法,初始化一個,而不必 new 一個對象      public HoldCounter initialValue() {          return new HoldCounter();      }  }

除此之外,Sync 中還定義了一些其他和讀鎖相關的屬性,

//保存了當前線程重入的讀鎖次數,當重入次數減到 0 時移除  //移除應該是為了性能着想,因為可以隨時通過 get 方法初始化 HoldCounter  private transient ThreadLocalHoldCounter readHolds;    //保存了最近一個獲取讀鎖成功的線程計數,這個變量的目的是:  //如果最後一個獲取到讀鎖的線程重複獲取讀鎖,那麼就可以直接拿來用,而不用更新。  //相當於緩存,提高效率  private transient HoldCounter cachedHoldCounter;    //第一個獲取讀鎖的線程  private transient Thread firstReader = null;  //第一個獲取讀鎖的線程計數  private transient int firstReaderHoldCount;  //這兩個參數,是為了效率問題,當只有一個線程獲得讀鎖時,就避免了查找 readHolds

基本知識講完啦,那麼接下來就是鎖的獲取和釋放了。先說下寫鎖吧,因為有上一篇獨佔鎖的基礎了,理解起來比較容易。

寫鎖的獲取

寫鎖的獲取從 lock 方法開始,

//ReentrantReadWriteLock.WriteLock.lock  public void lock() {      sync.acquire(1);  }  //AbstractQueuedSynchronizer.acquire  public final void acquire(int arg) {      if (!tryAcquire(arg) &&          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))          selfInterrupt();  }  //公平鎖和非公平鎖調用的是同一個方法,在 Sync 類中定義  //ReentrantReadWriteLock.Sync.tryAcquire  protected final boolean tryAcquire(int acquires) {      Thread current = Thread.currentThread();      //獲取同步狀態 state      int c = getState();      //寫鎖狀態      int w = exclusiveCount(c);      //如果同步狀態不為 0,說明有線程獲得了讀鎖或寫鎖      if (c != 0) {          //如果同步狀態不為 0 ,並且寫鎖狀態為 0,說明了讀鎖被佔用,因讀寫鎖互斥,故返回 false          //若寫鎖狀態不為 0,並且不是當前線程獲得了寫鎖,則不能重入,返回 false          if (w == 0 || current != getExclusiveOwnerThread())              return false;          //如果超過了最大寫鎖數量,則拋出異常          if (w + exclusiveCount(acquires) > MAX_COUNT)              throw new Error("Maximum lock count exceeded");          //若走到這一步,說明當前線程重入了,則計算重入次數,返回true          setState(c + acquires);          return true;      }      //到這說明 c 為 0,讀鎖和寫鎖都沒有被佔用      //如果寫鎖應該被阻塞或者 CAS 獲取寫鎖失敗,則返回false      if (writerShouldBlock() ||          !compareAndSetState(c, c + acquires))          return false;      //把當前線程設為獨佔鎖的所有者      setExclusiveOwnerThread(current);      return true;  }   

寫鎖的釋放

同理,寫鎖的釋放從 unlock 方法開始,

public void unlock() {      sync.release(1);  }    public final boolean release(int arg) {      if (tryRelease(arg)) {          Node h = head;          if (h != null && h.waitStatus != 0)              unparkSuccessor(h);          return true;      }      return false;  }    protected final boolean tryRelease(int releases) {      //若獨佔鎖的持有者不是當前線程,則拋出異常      if (!isHeldExclusively())          throw new IllegalMonitorStateException();      //每次釋放,state 減 1      int nextc = getState() - releases;      boolean free = exclusiveCount(nextc) == 0;      if (free)          setExclusiveOwnerThread(null);      setState(nextc);      return free;  }

可以看到,寫鎖的獲取和釋放和 ReentrantLock 的基本思想是差不多的。下面,着重講解讀鎖的獲取和釋放,相對比較複雜。

讀鎖的獲取

tryAcquireShared

從 ReadLock.lock 方法開始,

public void lock() {      //調用 AQS 的方法      sync.acquireShared(1);  }    public final void acquireShared(int arg) {      //如果 tryAcquireShared 方法返回小於 0,說明獲取讀鎖失敗      if (tryAcquireShared(arg) < 0)          //以共享模式加入同步隊列,再自旋搶鎖          doAcquireShared(arg);  }    protected final int tryAcquireShared(int unused) {      Thread current = Thread.currentThread();      int c = getState();      //如果有線程獲取到了寫鎖,並且不是當前線程,則返回 -1 。      //這是因為,如果線程先獲得了寫鎖,是可以重入再次獲取讀鎖的,此為鎖降級。      //否則不可重入。      if (exclusiveCount(c) != 0 &&          getExclusiveOwnerThread() != current)          return -1;      //讀鎖數量      int r = sharedCount(c);      //如果同時滿足以下三個條件(讀線程不應該被阻塞,讀鎖數量小於最大數量限制,CAS成功),      //則說明獲取讀鎖成功,返回 1。然後再設置相關屬性的值。      if (!readerShouldBlock() &&          r < MAX_COUNT &&          compareAndSetState(c, c + SHARED_UNIT)) {          //如果讀鎖狀態為 0,說明還沒有其他線程獲取到讀鎖          if (r == 0) {              //就把當前線程設置為第一個獲取到讀鎖的線程              firstReader = current;              //第一個讀線程計數設置為 1              firstReaderHoldCount = 1;          } else if (firstReader == current) {              //如果當前線程是第一個獲取讀鎖的線程,則重入,計數加 1              firstReaderHoldCount++;          } else { //讀鎖狀態不為 0,並且當前線程不是 firstReader              //最近一個成功獲取到讀鎖的線程計數器              HoldCounter rh = cachedHoldCounter;              //如果計數器為空,或者計數器的 tid不是當前線程 id,說明有兩種情況              //1.rh 還未被任何線程設置,此時只有 firstReader 一個線程獲取到了讀鎖。              //2.rh 已經被設置了,並且不是當前線程,說明在當前線程之前,除了 firstReader,              //還有其他線程獲取到了讀鎖,那麼當前線程就是第三個獲取到讀鎖的(至少)。              if (rh == null || rh.tid != getThreadId(current))                  //不管哪種情況,都需要創建並初始化當前線程的計數器,並賦值給 cachedHoldCounter                  //因為,當前線程是此時最後一個獲取到讀鎖的線程,需要緩存下來                  cachedHoldCounter = rh = readHolds.get();              //如果當前線程是最近一個獲取到讀鎖的線程,並且計數為0,              else if (rh.count == 0)                  //就把 rh 線程持有鎖的次數信息,放入到本地線程 readHolds                  readHolds.set(rh);              //最後把計數加 1              rh.count++;          }          return 1;      }      //若以上三個條件任意一個不滿足,則調用此方法,再次全力嘗試獲取鎖      return fullTryAcquireShared(current);  }

fullTryAcquireShared 這個方法和 tryAcquireShared 方法非常相似,只是多了一個自旋的過程,直到返回一個確定值(-1或1),才結束。

final int fullTryAcquireShared(Thread current) {      HoldCounter rh = null;      //自旋,直到返回一個確定值(1或 -1)      for (;;) {          int c = getState();          //如果寫鎖狀態不為0,說明有線程獲取到了寫鎖          if (exclusiveCount(c) != 0) {              //獲取到寫鎖的線程不是當前線程,則返回 -1              if (getExclusiveOwnerThread() != current)                  return -1;              //這裡省略了else,到這裡說明了當前線程獲取到了寫鎖,因此需要做鎖降級處理,              //把寫鎖降級為讀鎖。因為如果不這樣做的話,線程就會阻塞到這,會導致死鎖。              //然後跳轉到 ①處繼續執行              //===========//          } else if (readerShouldBlock()) {  //寫鎖空閑,並且讀鎖應該阻塞,說明 head.next正在等待獲取寫鎖              //儘管讀鎖應該阻塞,但是此處也不應該立即阻塞,因為有可能存在讀鎖重入,需要再確認一下。              if (firstReader == current) {//當前線程是第一個讀鎖,可重入                  // 將跳轉到 ①處              } else {                  if (rh == null) { //第一次循環進來時肯定為 null                      rh = cachedHoldCounter;  //取到緩存中最後一次獲取到讀鎖的計數器                      if (rh == null || rh.tid != getThreadId(current)) {                          rh = readHolds.get();                          //計數為 0,說明當前線程沒有獲取到過讀鎖                          if (rh.count == 0)                              //為了性能考慮,如果計數為 0,需要把它移除掉                              readHolds.remove();                      }                  }                  //走到這,說明當前線程不是 firstReader,也沒有獲取到過讀鎖,不符合重入條件,                  //那麼就確定需要阻塞,只能去排隊了,返回 -1 。                  if (rh.count == 0)                      return -1;              }          }          // ①處          //如果讀鎖數量達到了 MAX_COUNT,則拋出異常          if (sharedCount(c) == MAX_COUNT)              throw new Error("Maximum lock count exceeded");          //CAS獲取讀鎖,和 tryAcquireShared 的處理邏輯一樣,不再贅述          if (compareAndSetState(c, c + SHARED_UNIT)) {              if (sharedCount(c) == 0) {                  firstReader = current;                  firstReaderHoldCount = 1;              } else if (firstReader == current) {                  firstReaderHoldCount++;              } else {                  if (rh == null)                      rh = cachedHoldCounter;                  if (rh == null || rh.tid != getThreadId(current))                      rh = readHolds.get();                  else if (rh.count == 0)                      readHolds.set(rh);                  rh.count++;                  cachedHoldCounter = rh; // cache for release              }              return 1;          }      }  }

doAcquireShared

如果 tryAcquireShared 最終還是失敗了,那麼就執行 doAcquireShared 方法。

private void doAcquireShared(int arg) {      //以共享模式加入同步隊列      final Node node = addWaiter(Node.SHARED);      boolean failed = true;      try {          boolean interrupted = false;          for (;;) {              final Node p = node.predecessor();              if (p == head) {                  //如果當前節點的前驅節點是頭結點,再次嘗試獲取讀鎖                  int r = tryAcquireShared(arg);                  if (r >= 0) {                      //把當前節點設置為頭結點,並把共享狀態傳播下去                      setHeadAndPropagate(node, r);                      p.next = null; // help GC                      if (interrupted)                          selfInterrupt();                      failed = false;                      return;                  }              }              //獲取讀鎖失敗,判斷是否可掛起當前線程              if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;          }      } finally {          if (failed)              cancelAcquire(node);      }  }

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {      //舊的頭結點      Node h = head;      //把當前 node 設置為新的頭結點      setHead(node);        //propagate 是 tryAcquireShared 方法的返回值      //若大於0,或者舊的頭結點為空,或者頭結點的 ws 小於0      //又或者新的頭結點為空,或者新頭結點的 ws 小於0,則獲取後繼節點      if (propagate > 0 || h == null || h.waitStatus < 0 ||          (h = head) == null || h.waitStatus < 0) {          Node s = node.next;          //沒有後繼節點或者後繼節點是共享節點,就執行喚醒          if (s == null || s.isShared())              //釋放掉資源,並喚醒後繼節點,稍後講解              doReleaseShared();      }  }

讀鎖的釋放

tryReleaseShared

從 ReadLock.unlock方法開始,

public void unlock() {      sync.releaseShared(1);  }    public final boolean releaseShared(int arg) {      if (tryReleaseShared(arg)) {          doReleaseShared();          return true;      }      return false;  }    protected final boolean tryReleaseShared(int unused) {      Thread current = Thread.currentThread();      //當前線程為第一個讀線程      if (firstReader == current) {          //若 firstReader 的計數為1,則把它置為 null          if (firstReaderHoldCount == 1)              firstReader = null;          else          //否則,計數減 1,說明重入次數減 1              firstReaderHoldCount--;      } else {          HoldCounter rh = cachedHoldCounter;          if (rh == null || rh.tid != getThreadId(current))              rh = readHolds.get();          int count = rh.count;          if (count <= 1) {              //如果當前線程的計數小於等於 1,則移除              readHolds.remove();              if (count <= 0)                  //若計數小於等於 0,則拋出異常                  throw unmatchedUnlockException();          }          //計數減 1          --rh.count;      }      for (;;) {          int c = getState();          //讀鎖狀態減 1,其實就是state值減 65536          //因為高16位的讀鎖實際值,在state中的表現就是相差 65536          int nextc = c - SHARED_UNIT;          // CAS 設置 state 最新狀態          if (compareAndSetState(c, nextc))              //如果讀鎖狀態減為 0,就返回true              //釋放讀鎖對其它讀線程沒有任何影響,              //但是如果讀、寫鎖都空閑,就可以允許等待的寫線程繼續執行              return nextc == 0;      }  }

doReleaseShared

如果 tryReleaseShared 方法返回 true,說明讀鎖釋放成功,需要喚醒後繼節點,

private void doReleaseShared() {      for (;;) {          //頭結點          Node h = head;          //說明隊列中至少有兩個節點          if (h != null && h != tail) {              int ws = h.waitStatus;              if (ws == Node.SIGNAL) {                  //如果頭結點的 ws 為 -1 ,則 CAS 把它設置為 0,因為喚醒後繼節點後,                  //它就不需要做什麼了。失敗繼續自旋嘗試                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                      continue;                       // loop to recheck cases                  // CAS 成功,則喚醒後繼節點                  unparkSuccessor(h);              }              //如果 ws 為 0,則把它設置為 -3 ,表明共享狀態可向後傳播,失敗則繼續自旋嘗試              //後來我一直在想,為什麼需要設置一個 PROPAGATE 這樣的狀態呢,但是還沒頭緒              //可以看下這篇文章分析,或許有一定的參考價值:              //https://www.cnblogs.com/micrari/p/6937995.html              //只能說 Doug Lea 大神的邏輯真是太縝密了,等我以後想明白了,再補充吧。              //可以暫時先理解為,這就是一個無條件傳播的標誌              else if (ws == 0 &&                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                  continue;                // loop on failed CAS          }          //如果此刻 h 等於頭結點,說明頭結點未改變,則跳出整個循環          //否則,說明頭結點被其他線程修改過了,則繼續下一次的循環判斷          if (h == head)                   // loop if head changed              break;      }  }

結語

關於獨佔鎖,比較簡單。而讀鎖,涉及到了很多臨界點和瞬時狀態。其實細想,並不像表面上看起來那麼簡單,理解的會比較淺顯,畢竟 Doug Lea 大神的思想不是常人能揣摩透的。

本篇只是我的一些個人理解,如有講解不到位的地方,歡迎拍磚。

其實,還有很多細節問題,本文並沒有展開。例如, setHeadAndPropagate 方法為什麼判斷兩次新舊節點的 ws 狀態,意義何為。 doReleaseShared 方法最後為什麼需要設計 h == head 這樣的判斷,有什麼含義。包括為什麼要設計 PROPAGATE 狀態,沒有這個狀態又如何。

看來路阻且長啊。。。以後再來補坑吧,這篇只能叫淺析了。 ̄□ ̄||

如果本文對你有用,歡迎點贊,評論,轉發。

學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。