ReentrantLock 源碼分析以及 AQS (一)

  • 2020 年 3 月 13 日
  • 筆記

前言

JDK1.5 之後發佈了JUC(java.util.concurrent),用於解決多線程並發問題。AQS 是一個特別重要的同步框架,很多同步類都藉助於 AQS 實現了對線程同步狀態的管理。

AQS 中最主要的就是獨佔鎖和共享鎖的獲取和釋放,以及提供了一些可中斷的獲取鎖,超時等待鎖等方法。

ReentranLock 是基於 AQS 獨佔鎖的一個實現。ReentrantReadWriteLock 是基於 AQS 共享鎖的一個讀寫鎖實現。本來打算一篇文章裏面寫完獨佔鎖和共享鎖,但是發現篇幅太長了,也不易於消化。

因此,本篇就先結合 ReentrantLock 源碼,分析 AQS 的獨佔鎖獲取和釋放。以及 ReentrantLock 的公平鎖和非公平鎖實現。

下一篇再寫 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放。

在正式講解源碼之前,牆裂建議讀者做一些準備工作,最好對以下知識有一定的了解,這樣閱讀起來源碼會比較輕鬆(因為,我當初剛開始接觸多線程時,直接看 AQS 簡直是一臉懵逼,就像讀天書一樣。。)。

  1. 了解雙向鏈表的數據結構,以及隊列的入隊出隊等操作。
  2. LockSupport 的 park,unpark 方法,以及對線程的 interrupt 幾個方法了解(可參考:LockSupport的 park 方法是怎麼響應中斷的?)。
  3. 對 CAS 和自旋機制有一定的了解。

AQS 同步隊列

AQS 內部維護了一個 FIFO(先進先出)的雙向隊列。它的內部是用雙向鏈表來實現的,每個數據節點(Node)中都包含了當前節點的線程信息,還有它的前後兩個指針,分別指向前驅節點和後繼節點。下邊看一下 Node 的屬性和方法:

static final class Node {      //可以認為是一種標記,表明了這個 node 是以共享模式在同步隊列中等待      static final Node SHARED = new Node();      //也是一種標記,表明這個 node 是以獨佔模式在同步隊列中等待      static final Node EXCLUSIVE = null;        /** waitStatus 常量值 */      //說明當前節點被取消,原因有可能是超時,或者被中斷。      //節點被取消的狀態是不可逆的,也就是說此節點會一直停留在取消狀態,不會轉變。      static final int CANCELLED =  1;      //說明後繼節點的線程被 park 阻塞,因此當前線程需要在釋放鎖或者被取消時,喚醒後繼節點      static final int SIGNAL    = -1;      //說明線程在 condition 條件隊列等待      static final int CONDITION = -2;      //在共享模式中用,表明下一個共享線程應該無條件傳播      static final int PROPAGATE = -3;          //當前線程的等待狀態,除了以上四種值,還有一個值 0 為初始化狀態(條件隊列的節點除外)。      //注意這個值修改時是通過 CAS ,以保證線程安全。      volatile int waitStatus;        //前驅節點      volatile Node prev;        //後繼節點      volatile Node next;        //當前節點中的線程,通過構造函數初始化,出隊時會置空(這個後續說,重點強調)      volatile Thread thread;        //有兩種情況。1.在 condition 條件隊列中的後一個節點      //2. 一個特殊值 SHARED 用於表明當前是共享模式(因為條件隊列只存在於獨佔模式)      Node nextWaiter;        //是否是共享模式,理由同上      final boolean isShared() {          return nextWaiter == SHARED;      }        //返回前驅節點,如果為空拋出空指針      final Node predecessor() throws NullPointerException {          Node p = prev;          if (p == null)              throw new NullPointerException();          else              return p;      }        Node() {    // Used to establish initial head or SHARED marker      }        Node(Thread thread, Node mode) {     // Used by addWaiter          this.nextWaiter = mode;          this.thread = thread;      }        Node(Thread thread, int waitStatus) { // Used by Condition          this.waitStatus = waitStatus;          this.thread = thread;      }  }

另外,在 AQS 類中,還會記錄同步隊列的頭結點和尾結點:

    //同步隊列的頭結點,是懶加載的,即不會立即創建一個同步隊列,      //只有當某個線程獲取不到鎖,需要排隊的時候,才會初始化頭結點      private transient volatile Node head;        //同步隊列的尾結點,同樣是懶加載。      private transient volatile Node tail;

獨佔鎖

這部分就結合 ReentrantLock 源碼分析 AQS 的獨佔鎖是怎樣獲得和釋放鎖的。

非公平鎖

首先,我們從 ReentrantLock 開始分析,它有兩個構造方法,一個構造,可以傳入一個 boolean 類型的參數,表明是用公平鎖還是非公平鎖模式。另一個構造方法,不傳入任何參數,則默認用非公平鎖。

public ReentrantLock() {      sync = new NonfairSync();  }    public ReentrantLock(boolean fair) {      sync = fair ? new FairSync() : new NonfairSync();  }

NonfairSync 和 FairSync 都繼承自 Sync ,它們都是 ReentranLock 的內部類。 而Sync 類又繼承自 AQS (AbstractQueuedSynchronizer)。

static final class NonfairSync extends Sync {  }    static final class FairSync extends Sync {  }    abstract static class Sync extends AbstractQueuedSynchronizer {  }

知道了它們之間的繼承關係,我們就從非公平鎖的加鎖方法作為入口,跟蹤源碼。因為非公平鎖的流程講明白之後,公平鎖大致流程都一樣,只是多了一個條件判斷(這個,一會兒後邊細講,會做對比)。

NonfairSync.lock

我們看下公平鎖的獲取鎖的方法:

final void lock() {      //通過 CAS 操作把 state 設置為 1      if (compareAndSetState(0, 1))          //如果設值成功,說明加鎖成功,保存當前獲得鎖的線程          setExclusiveOwnerThread(Thread.currentThread());      else          //如果加鎖失敗,則執行 AQS 的acquire 方法          acquire(1);  }    public final void acquire(int arg) {      if (!tryAcquire(arg) &&          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))          selfInterrupt();  }

acquire

這個方法的邏輯是:

  1. 通過 tryAcquire 方法,嘗試獲取鎖,如果成功,則返回 true,失敗返回 false 。
  2. tryAcquire 失敗之後,會先調用 addWaiter 方法,把當前線程封裝成 node 節點,加入同步隊列(獨佔模式)。
  3. acquireQueued 方法會把剛加入隊列的 node 作為參數,通過自旋去獲得鎖。

tryAcquire

這是一個模板方法,具體的實現需要看它的子類,這裡對應的就是 ReentrantLock.NonfairSync.tryAcquire 方法。我們看一下:

protected final boolean tryAcquire(int acquires) {      return nonfairTryAcquire(acquires);  }    final boolean nonfairTryAcquire(int acquires) {      //當前線程      final Thread current = Thread.currentThread();      //獲取當前的同步狀態,若為 0 ,表示無鎖狀態。若大於 0,表示已經有線程搶到了鎖。      int c = getState();      if (c == 0) {          //然後通過 CAS 操作把 state 的值改為 1。          if (compareAndSetState(0, acquires)) {              // CAS 成功之後,保存當前獲得鎖的線程              setExclusiveOwnerThread(current);              return true;          }      }      // 如果 state 大於0,則判斷當前線程是否是獲得鎖的線程,是的話,可重入。      else if (current == getExclusiveOwnerThread()) {          //由於 ReentrantLock 是可重入的,所以每重入一次 state 就加 1 。          int nextc = c + acquires;          if (nextc < 0)              throw new Error("Maximum lock count exceeded");          setState(nextc);          return true;      }      return false;  }

addWaiter

如果獲取鎖失敗之後,就會調用 addWaiter 方法,把當前線程加入同步隊列。

private Node addWaiter(Node mode) {      //把當前線程封裝成 Node ,並且是獨佔模式      Node node = new Node(Thread.currentThread(), mode);      //嘗試快速入隊,如果失敗,則會調用 enq 入隊方法。enq 會初始化隊列。      Node pred = tail;      //如果 tail 不為空,說明當前隊列中已經有節點      if (pred != null) {          //把當前 node 的 prev 指針指向 tail          node.prev = pred;          //通過 CAS 把 node 設置為 tail,即添加到隊尾          if (compareAndSetTail(pred, node)) {              //把舊的 tail 節點的 next 指針指向當前 node              pred.next = node;              return node;          }      }      //當 tail 為空時,把 node 添加到隊列,如果需要的話,先進行隊列初始化      enq(node);      //入隊成功之後,返回當前 node      return node;  }

enq

通過自旋,把當前節點加入到隊列中

private Node enq(final Node node) {      for (;;) {          Node t = tail;          //如果 tail為空,說明隊列未初始化          if (t == null) {              //創建一個空節點,通過 CAS把它設置為頭結點              if (compareAndSetHead(new Node()))                  //此時只有一個 head頭節點,因此把 tail也指向它                  tail = head;          } else {              //第二次自旋時,tail不為空,於是把當前節點的 prev指向 tail節點              node.prev = t;              //通過 CAS把 tail節點設置為當前 node節點              if (compareAndSetTail(t, node)) {                  //把舊的 tail節點的 next指向當前 node                  t.next = node;                  return t;              }          }      }  }

acquireQueued

入隊成功之後,就會調用 acquireQueued 方法自旋搶鎖。

final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {          boolean interrupted = false;          for (;;) {              //獲取當前節點的前驅節點              final Node p = node.predecessor();              //如果前驅節點就是 head 節點,就調用 tryAcquire 方法搶鎖              if (p == head && tryAcquire(arg)) {                  //如果搶鎖成功,就把當前 node 設置為頭結點                  setHead(node);                  p.next = null; // help GC                  failed = false;                  //搶鎖成功後,會把線程中斷標誌返回出去,終止for循環                  return interrupted;              }              //如果搶鎖失敗,就根據前驅節點的 waitStatus 狀態判斷是否需要把當前線程掛起              if (shouldParkAfterFailedAcquire(p, node) &&                  //線程被掛起時,判斷是否被中斷過                  parkAndCheckInterrupt())                  //注意此處,如果被線程被中斷過,需要把中斷標誌重新設置一下                  interrupted = true;          }      } finally {          if (failed)              //如果拋出異常,則取消鎖的獲取,進行出隊操作              cancelAcquire(node);      }  }

setHead

通過代碼,我們可以看到,當前的同步隊列中,只有第二個節點才有資格搶鎖。如果搶鎖成功,則會把它設置為頭結點。

private void setHead(Node node) {      head = node;      node.thread = null;      node.prev = null;  }

需要注意的是,這個方法,會把頭結點的線程設置為 null 。想一下,為什麼?

因為,此時頭結點的線程已經搶鎖成功,需要出隊了。自然的,隊列中也就不應該存在這個線程了。

PS:由 enq 方法,還有 setHead 方法,我們可以發現,頭結點的線程總是為 null。這是因為,頭結點要麼是剛初始化的空節點,要麼是搶到鎖的線程出隊了。因此,我們也常常把頭結點叫做虛擬節點(不存儲任何線程)。

shouldParkAfterFailedAcquire

以上是搶鎖成功的情況,那麼搶鎖失敗了呢?這時,我們需要判斷是否應該把當前線程掛起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {      //獲取當前節點的前驅節點的 waitStatus      int ws = pred.waitStatus;      if (ws == Node.SIGNAL)          //如果 ws = -1 ,說明當前線程可以被前驅節點正常喚醒,於是就可以安全的 park了          return true;      if (ws > 0) {          //如果 ws > 0,說明前驅節點被取消,則會從當前節點依次向前查找,          //直到找到第一個沒有被取消的節點,把那個節點的 next 指向當前 node          //這一步,是為了找到一個可以把當前線程喚起的前驅節點          do {              node.prev = pred = pred.prev;          } while (pred.waitStatus > 0);          pred.next = node;      } else {          //如果 ws 為 0,或者 -3(共享鎖狀態),則把它設置為 -1          //返回 false,下次自旋時,就會判斷等於 -1,返回 true了          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);      }      return false;  }   

parkAndCheckInterrupt

如果 shouldParkAfterFailedAcquire 返回 true,說明當前線程需要被掛起。因此,就執行此方法,同時檢查線程是否被中斷。

private final boolean parkAndCheckInterrupt() {      //把當前線程掛起,則 acquireQueued 方法的自旋就會暫停,等待前驅節點 unpark      LockSupport.park(this);      //返回當前節點是否被中斷的標誌,注意此方法會把線程的中斷標誌清除。      //因此,返回上一層方法時,需要設置 interrupted = true 把中斷標誌重新設置,以便上層代碼可以處理中斷      return Thread.interrupted();  }

想一下,為什麼搶鎖失敗後,需要判斷是否把線程掛起?

因為,如果搶不到鎖,並且還不把線程掛起,acquireQueued 方法就會一直自旋下去,這樣你的CPU能受得了嗎。

cancelAcquire

當不停的自旋搶鎖時,若發生了異常,就會調用此方法,取消正在嘗試獲取鎖的線程。node 的位置分為三種情況,見下面注釋,

private void cancelAcquire(Node node) {        if (node == null)          return;        // node 不再指向任何線程      node.thread = null;        Node pred = node.prev;      //從當前節點不斷的向前查找,直到找到一個有效的前驅節點      while (pred.waitStatus > 0)          node.prev = pred = pred.prev;        Node predNext = pred.next;        //把 node 的 ws 設置為 -1      node.waitStatus = Node.CANCELLED;        // 1.如果 node 是 tail,則把 tail 更新為 node,並把 pred.next 指向 null      if (node == tail && compareAndSetTail(node, pred)) {          compareAndSetNext(pred, predNext, null);      } else {          int ws;          //2.如果 node 既不是 tail,也不是 head 的後繼節點,就把 node的前驅節點的 ws 設置為 -1          //最後把 node 的前驅節點的 next 指向 node 的後繼節點          if (pred != head &&              ((ws = pred.waitStatus) == Node.SIGNAL ||               (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&              pred.thread != null) {              Node next = node.next;              if (next != null && next.waitStatus <= 0)                  compareAndSetNext(pred, predNext, next);          } else {              //3.如果 node是 head 的後繼節點,則直接喚醒 node 的後繼節點。              //這個也很好理解,因為 node 是隊列中唯一有資格嘗試獲取鎖的節點,              //它放棄了資格,當然有義務把後繼節點喚醒,以讓後繼節點嘗試搶鎖。              unparkSuccessor(node);          }            node.next = node; // help GC      }  }

unparkSuccessor

這個喚醒方法就比較簡單了,

private void unparkSuccessor(Node node) {        int ws = node.waitStatus;      if (ws < 0)          compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;      if (s == null || s.waitStatus > 0) {          s = null;          //從尾結點向前依次遍歷,直到找到距離當前 node 最近的一個有效節點          for (Node t = tail; t != null && t != node; t = t.prev)              if (t.waitStatus <= 0)                  s = t;      }      if (s != null)          //把這個有效節點的線程喚醒,          //喚醒之後,當前線程就可以繼續自旋搶鎖了,(回到 park 的地方)          LockSupport.unpark(s.thread);  }

下面畫一個流程圖更直觀的查看整個獲取鎖的過程。

公平鎖

公平鎖和非公平鎖的整體流程大致相同,只是在搶鎖之前先判斷一下是否已經有人排在前面,如果有的話,就不執行搶鎖。我們通過源碼追蹤到 FairSync.tryAcquire 方法。會發現,多了一個 hasQueuedPredecessors 方法。

hasQueuedPredecessors

這個方法判斷邏輯稍微有點複雜,有多種情況。

public final boolean hasQueuedPredecessors() {      Node t = tail;      Node h = head;      Node s;      return h != t &&              ((s = h.next) == null || s.thread != Thread.currentThread());  }
  1. 如果 h == t,說明 h 和 t 都為空(此時隊列還未初始化)或者它們是同一個節點(說明隊列已經初始化,並且只有一個節點,此時為 enq 方法第一次自旋成功後)。此時,返回false。
  2. 如果 h != t,則判斷 head.next == null 是否成立,如果成立,則返回 true。這種情況發生在有其他線程第一次入隊時。在 AQS 的 enq 入隊方法,設置頭結點成功之後 compareAndSetHead(new Node()) ,還未執行 tail = head 時(仔細想一想為什麼?)。此時 tail = null , head = new Node(),head.next = null。
  3. 如果 h != t,並且 head.next != null,說明此時隊列中至少已經有兩個節點,則判斷 head.next 是否是當前線程。如果是,返回 false(注意是 false哦,因為用了 !),否則返回 true 。

總結:以上幾種情況,只有最終返回 false 時,才會繼續往下執行。因為 false,說明沒有線程排在當前線程前面,於是通過 CAS 嘗試把 state 值設置為 1。若成功,則方法返回。若失敗,同樣需要去排隊。

公平鎖和非公平鎖區別

舉個例子來對比公平鎖和非公平鎖。比如,現在到飯點了,大家都到食堂打飯。把隊列中的節點比作排隊打飯的人,每個打飯窗口都有一個管理員,只有排隊的人從管理員手中搶到鎖,才有資格打飯。打飯的過程就是線程執行的過程。

如果,你發現前面沒有人在排隊,那麼就可以直接從管理員手中拿到鎖,然後打飯。對於公平鎖來說,如果你前面有人在打飯,那麼你就要排隊到他後面(圖中B),等他打完之後,把鎖還給管理員。那麼,你就可以從管理員手中拿到鎖,然後打飯了。後面的人依次排隊。這就是FIFO先進先出的隊列模型。

對於非公平鎖來說,如果你是圖中的 B,當 A 把鎖還給管理員後,有可能有另外一個 D 插隊過來直接把鎖搶走。那麼,他就可以打飯,你只能繼續等待了。

所以,可以看出來。公平鎖是嚴格按照排隊的順序來的,先來後到嘛,你來的早,就可以早點獲取鎖。優點是,這樣不會造成某個線程等待時間過長,因為大家都是中規中矩的在排隊。而缺點呢,就是會頻繁的喚起線程,增加 CPU的開銷。

非公平鎖的優點是吞吐量大,因為有可能正好鎖可用,然後線程來了,直接搶到鎖了,不用排隊了,這樣也減少了 CPU 喚醒排隊線程的開銷。 但是,缺點也很明顯,你說我排隊排了好長時間了,終於輪到我打飯了,憑什麼其他人剛過來就插到我前面,比我還先打到飯,也太不公平了吧,後邊一大堆排隊的人更是怨聲載道。這要是每個人來了都插到我前面去,我豈不是要餓死了。

獨佔鎖的釋放

我們從 ReentrantLock 的 unlock 方法看起:

public void unlock() {      //調用 AQS 的 release 方法      sync.release(1);  }    public final boolean release(int arg) {      if (tryRelease(arg)) {          Node h = head;          //如果頭結點不為空,並且 ws 不為 0,則喚起後繼節點          if (h != null && h.waitStatus != 0)              unparkSuccessor(h);          return true;      }      return false;  }

這段邏輯比較簡單,當線程釋放鎖之後,就會喚醒後繼節點。 unparkSuccessor 已講,不再贅述。然後看下 tryRelease 方法,公平鎖和非公平鎖走的是同一個方法。

protected final boolean tryRelease(int releases) {      //每釋放一次鎖,state 值就會減 1,因為之前可能有鎖的重入      int c = getState() - releases;      //如果當前線程不是搶到鎖的線程,則拋出異常      if (Thread.currentThread() != getExclusiveOwnerThread())          throw new IllegalMonitorStateException();      boolean free = false;      if (c == 0) {          //只有 state 的值減到 0 的時候,才會全部釋放鎖          free = true;          setExclusiveOwnerThread(null);      }      setState(c);      return free;  }

因為,ReentrantLock 支持鎖的重入,所以每次重入 state 值都會加 1,相應的每次釋放鎖, state 的值也會減 1 。所以,這也是為什麼每個 lock 方法最後都要有一個 unlock 方法釋放鎖,它們的個數需要保證相同。

當 state 值為 0 的時候,說明鎖完全釋放。其他線程才可以有機會搶到鎖。

結語

以上已經講解了獨佔鎖主要的獲取方法 acquire ,另外還有一些其他相關方法,不再贅述,因為主要邏輯都是一樣的,只有部分稍有不同,只要理解了 acquire ,這些都是相通的。如 acquireInterruptibly 方法,它可以在獲取鎖的時候響應中斷。還有超時獲取鎖的方法 doAcquireNanos 可以設定獲取鎖的超時時間,超時之後就返回失敗。

下篇預告:分析 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放,敬請期待。

如果本文對你有用,歡迎點贊,評論,轉發。

學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。