­

ReentrantLock源碼探究

  • 2020 年 3 月 17 日
  • 筆記

ReentrantLock是一種可重入鎖,可重入是說同一個線程可以多次獲取同一個鎖,內部會有相應的字段記錄重入次數,它同時也是一把互斥鎖,意味着同時只有一個線程能獲取到可重入鎖。

1.構造函數

    public ReentrantLock() {          sync = new NonfairSync();      }        public ReentrantLock(boolean fair) {          sync = fair ? new FairSync() : new NonfairSync();      }

ReentrantLock提供了兩個構造函數,構造函數只是用來初始化sync字段,可以看到,默認情況下ReentrantLock使用的是非公平鎖,當然,也可以使用帶有布爾參數的構造函數來選擇使用公平鎖。公平鎖和非公平鎖的實現依賴於兩個內部類:FairSyncNonfairSync,接下來認識一下這兩個類:

    //非公平鎖      static final class NonfairSync extends Sync {          private static final long serialVersionUID = 7316153563782823691L;            /**           * Performs lock.  Try immediate barge, backing up to normal           * acquire on failure.           */          final void lock() {              if (compareAndSetState(0, 1))                  setExclusiveOwnerThread(Thread.currentThread());              else                  acquire(1);          }            protected final boolean tryAcquire(int acquires) {              return nonfairTryAcquire(acquires);          }      }        static final class FairSync extends Sync {          private static final long serialVersionUID = -3000897897090466540L;            final void lock() {              acquire(1);          }            /**           * Fair version of tryAcquire.  Don't grant access unless           * recursive call or no waiters or is first.           */          protected final boolean tryAcquire(int acquires) {              final Thread current = Thread.currentThread();              int c = getState();              if (c == 0) {                  if (!hasQueuedPredecessors() &&                      compareAndSetState(0, acquires)) {                      setExclusiveOwnerThread(current);                      return true;                  }              }              else if (current == getExclusiveOwnerThread()) {                  int nextc = c + acquires;                  if (nextc < 0)                      throw new Error("Maximum lock count exceeded");                  setState(nextc);                  return true;              }              return false;          }      }

這兩個內部類的代碼都很短,並且都繼承了另一個內部類Sync。這裡先不急着介紹Sync類,因為這個類本身也並不複雜,後續在需要用到其中的方法時順帶講解,目前只需要知道這個類繼承了AbstractQueuedSynchronizer(AQS)即可。

2.常用方法

  • lock()
    public void lock() {          sync.lock();      }

lock方法提供了加鎖的功能,公平鎖和非公平鎖的加鎖操作是不一樣的,先來看看非公平鎖的細節,接着再講解公平鎖。

  • 非公平鎖加鎖邏輯
    final void lock() {          //使用CAS操作,嘗試將state字段從0修改為1,如果成功修改該字段,則表示獲取了互斥鎖          //如果獲取互斥鎖失敗,轉入acquier()方法邏輯          if (compareAndSetState(0, 1))              setExclusiveOwnerThread(Thread.currentThread());          else              acquire(1);      }        //設置獲得了互斥鎖的線程      protected final void setExclusiveOwnerThread(Thread thread) {          exclusiveOwnerThread = thread;      }        public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();      }

對於非公平鎖來講,使用lock()方法一上來就嘗試獲取互斥鎖,獲取成功就將exclusiveOwnerThread指向自己,代表當前是自己持有鎖,否則就執行acquire()方法的邏輯,下面對acquire()方法的邏輯進行逐個分析。
首先是tryAcquire()方法,非公平鎖重寫了該方法,並在內部調用Sync類的nonfairTryAcquire()

    //從上面的邏輯來看,這裡的acquires=1      protected final boolean tryAcquire(int acquires) {          return nonfairTryAcquire(acquires);      }        final boolean nonfairTryAcquire(int acquires) {          final Thread current = Thread.currentThread();          int c = getState();          //c==0,說明當前處於未加鎖狀態,鎖沒有被其他線程獲取          if (c == 0) {              //在鎖沒有被其他線程佔有的情況下,非公平鎖再次嘗試獲取鎖,獲取成功則將exclusiveOwnerThread指向自己              if (compareAndSetState(0, acquires)) {                  setExclusiveOwnerThread(current);                  return true;              }          }          //執行到這裡說明鎖已經被佔有,如果是被自己佔有,將state字段加1,記錄重入次數          else if (current == getExclusiveOwnerThread()) {              int nextc = c + acquires;              //當nextc達到int類型最大值時會溢出,因此可重入次數的最大值就是int類型的最大值              if (nextc < 0) // overflow                  throw new Error("Maximum lock count exceeded");              setState(nextc);              return true;          }          //執行到這裡說明:1)鎖未被佔有的情況下,搶鎖失敗,說明當前有其他線程搶到了鎖;2)鎖已經被其他線程佔有          //即只要當前線程沒有獲取到鎖,就返回false          return false;      }        //獲取state字段,該字段定義在AQS中      protected final int getState() {          return state;      }      //設置state字段      protected final void setState(int newState) {          state = newState;      }

當前線程沒有在tryAcquire()方法中獲取到鎖時,會先執行addWaiter(Node.EXCLUSIVE)方法,其中參數Node.EXCLUSIVE是一個常量,其定義是static final Node EXCLUSIVE = null,作用是標記鎖的屬性是互斥鎖。addWaiter()方法的作用是將當前線程包裝成一個Node節點,放入等待隊列的隊尾,該方法在介紹CountDownLatch類時詳細講解過,有興趣的朋友可以參考ConcurrentHashMap源碼探究(JDK 1.8),本文不再贅述。
將當前線程加入等待隊列之後,會接着執行acquireQueued()方法,其源碼如下:

    final boolean acquireQueued(final Node node, int arg) {          boolean failed = true;          try {              boolean interrupted = false;              //自旋              for (;;) {                  //獲取當前節點的前一個節點                  final Node p = node.predecessor();                  //如果前一個節點是頭節點,說明當前節點排在隊首,非公平鎖會則再次通過tryAcquire方法獲取鎖                  if (p == head && tryAcquire(arg)) {                      //將自己設置為頭節點                      setHead(node);                      //前一個頭結點沒用了,會被垃圾回收掉                      p.next = null; // help GC                      failed = false;                      //正常結束,返回false,注意該字段可能會在下面的條件語句中被改變                      return interrupted;                  }                  //如果前一個節點不是頭節點,或者當前線程獲取鎖失敗,會執行到這裡                  //shouldParkAfterFailedAcquire()方法只有在p的狀態是SIGNAL時才返回false,此時parkAndCheckInterrupt()方法才有機會執行                  //注意外層的自旋,for循環體會一直重試,因此只要執行到這裡,總會有機會將p設置成SIGNAL狀態從而將當前線程掛起                  //另外,如果parkAndCheckInterrupt()返回true,說明當前線程設置了中斷狀態,會將interrupted設置為true,代碼接着自旋,會在上一個條件語句中返回true                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      interrupted = true;              }          } finally {              //如果在自旋中線程被中斷或者發送異常,failed字段的值將會為true,這裡會處理這種情況,放棄讓當前線程獲取鎖,並拋出中斷異常              if (failed)                  cancelAcquire(node);          }      }      //方法邏輯是:只有在前置節點的狀態是SIGNAL時才返回true,其他情況都返回false      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {          int ws = pred.waitStatus;          if (ws == Node.SIGNAL)              return true;          //刪除當前節點之前連續狀態是CANCELLED的節點          if (ws > 0) {              do {                  node.prev = pred = pred.prev;              } while (pred.waitStatus > 0);              pred.next = node;          } else {              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);          }          return false;      }      //線程在這裡阻塞,並在被喚醒後檢查中斷狀態      private final boolean parkAndCheckInterrupt() {          LockSupport.park(this);          return Thread.interrupted();      }        //      private void cancelAcquire(Node node) {          // Ignore if node doesn't exist          if (node == null)              return;            node.thread = null;            // Skip cancelled predecessors          Node pred = node.prev;          while (pred.waitStatus > 0)              node.prev = pred = pred.prev;            Node predNext = pred.next;            node.waitStatus = Node.CANCELLED;            // If we are the tail, remove ourselves.          if (node == tail && compareAndSetTail(node, pred)) {              compareAndSetNext(pred, predNext, null);          } else {              // If successor needs signal, try to set pred's next-link              // so it will get one. Otherwise wake it up to propagate.              int ws;              if (pred != head &&                  ((ws = pred.waitStatus) == Node.SIGNAL ||                   (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                  pred.thread != null) {                  Node next = node.next;                  if (next != null && next.waitStatus <= 0)                      compareAndSetNext(pred, predNext, next);              } else {                  //喚醒後一個節點                  unparkSuccessor(node);              }                node.next = node; // help GC          }      }

注意acquireQueued()要麼會拋出中斷異常,要麼正常結束返回false,只有在線程被喚醒後設置了中斷狀態才會返回true。對比可以發現,acquireQueued()方法的邏輯與CountDownLatch中的doAcquireSharedInterruptibly()十分類似,許多方法在CountDownLatch這篇博客中講到過,本文不再對這些方法進行贅述。
介紹完了acquire()方法,回過頭來看看方法邏輯:

    public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();      }

如果在tryAcquire()方法中沒有獲取鎖,那麼將當前線程加入到等待隊列隊尾,查看節點的前一個節點是否是頭結點,是的話當前線程可以繼續向下執行,否則就會阻塞掛起。當acquireQueued返回true時,說明線程設置了中斷狀態,就調用selfInterrupt()中斷該線程,其他情況selfInterrupt()方法沒機會執行。
到這裡非公平鎖的加鎖流程已經介紹完了,由於代碼邏輯比較長,且看源碼的過程中會在好幾個類中來回切換,思路很容易斷,閱讀代碼的時候要注意。(有必要補個流程圖)。

  • 公平鎖加鎖邏輯
    接下來看看公平鎖的加鎖邏輯:
    final void lock() {          acquire(1);      }

與非公平鎖相比,公平鎖沒有一上來就搶鎖的邏輯,這也是公平性的體現。兩種鎖的acquire()方法的框架相同,但是實現細節不同,來看看公平鎖的tryAcquire()方法:

    protected final boolean tryAcquire(int acquires) {          final Thread current = Thread.currentThread();          int c = getState();          //c=0表示當前沒有其他線程持有鎖          if (c == 0) {              //下面的代碼與非公平鎖相比,多了hasQueuedPredecessors()方法的處理邏輯,公平鎖只有在前面沒有其他線程排隊的情況下才會嘗試獲取鎖              if (!hasQueuedPredecessors() &&                  compareAndSetState(0, acquires)) {                  setExclusiveOwnerThread(current);                  return true;              }          }          //如果當前線程已經佔有公平鎖,則記錄重入次數          else if (current == getExclusiveOwnerThread()) {              int nextc = c + acquires;              if (nextc < 0)                  throw new Error("Maximum lock count exceeded");              setState(nextc);              return true;          }          //只要當前線程沒有獲取到鎖,就返回false          return false;      }        public final boolean hasQueuedPredecessors() {          // The correctness of this depends on head being initialized          // before tail and on head.next being accurate if the current          // thread is first in queue.          Node t = tail; // Read fields in reverse initialization order          Node h = head;          Node s;          //h != t表示等待隊列中有其他節點          //h.next == null可能會有點費解,按理說h!=t之後,h後面肯定會有節點才對,這種情況其實已經見過,在上文介紹acquireQueued()方法時說過,          //被喚醒的第一個等待節點會將自己設置為頭結點,如果這個節點是隊列中的唯一節點的話,它的下一個節點就是null          //至於s.thread != Thread.currentThread()這個條件暫時可以忽略,因為公平鎖執行到hasQueuedPredecessors方法時根本還沒有入隊,          //這也意味着,只要隊列中有其他節點在等候,公平鎖就要求其他線程排隊等待          return h != t &&              ((s = h.next) == null || s.thread != Thread.currentThread());      }
  • lockInterruptibly
    從名字可以看出,lockInterruptibly可以響應中斷,來看看該方法的實現:
    public void lockInterruptibly() throws InterruptedException {          sync.acquireInterruptibly(1);      }        public final void acquireInterruptibly(int arg)              throws InterruptedException {          if (Thread.interrupted())              throw new InterruptedException();          //先嘗試獲取鎖,獲取失敗才執行後面的邏輯          if (!tryAcquire(arg))              doAcquireInterruptibly(arg);      }        private void doAcquireInterruptibly(int arg)          throws InterruptedException {          final Node node = addWaiter(Node.EXCLUSIVE);          boolean failed = true;          try {              for (;;) {                  final Node p = node.predecessor();                  if (p == head && tryAcquire(arg)) {                      setHead(node);                      p.next = null; // help GC                      failed = false;                      return;                  }                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      throw new InterruptedException();              }          } finally {              if (failed)                  cancelAcquire(node);          }      }

lockInterruptibly()方法幾乎與acquire()方法完全一樣,唯一的區別是acquire()方法中,parkAndCheckInterrupt因為線程設置了中斷狀態而返回true時,只是簡單設置了一下interrupted字段的值,而lockInterruptibly()則是直接拋出異常。

  • unlock方法
    介紹完加鎖的邏輯,接下來看看解鎖的邏輯:
    public void unlock() {          sync.release(1);      }        public final boolean release(int arg) {          //如果成功釋放了鎖,則執行下面的代碼塊          if (tryRelease(arg)) {              Node h = head;              //如果頭節點不為null,請求節點狀態不是初始狀態,就釋放頭結點後第一個有效節點              //問題:這裡為什麼需要判斷頭結點的狀態呢???              if (h != null && h.waitStatus != 0)                  unparkSuccessor(h);              return true;          }          return false;      }        //      protected final boolean tryRelease(int releases) {          int c = getState() - releases;          //線程沒有持有鎖的情況下,不允許釋放鎖,否則會拋異常          if (Thread.currentThread() != getExclusiveOwnerThread())              throw new IllegalMonitorStateException();          boolean free = false;          //可重入性的判斷,如果釋放了一次鎖,使得c=0,就指針釋放鎖,做法是將記錄鎖的字段exclusiveOwnerThread重新指向null          //注意,只有最後一次釋放可重入鎖,才會返回true          if (c == 0) {              free = true;              setExclusiveOwnerThread(null);          }          setState(c);          return free;      }        //喚醒node節點的下一個有效節點,這裡的有效指的是狀態不是CANCELLED狀態的節點      private void unparkSuccessor(Node node) {            int ws = node.waitStatus;          if (ws < 0)              compareAndSetWaitStatus(node, ws, 0);            Node s = node.next;          if (s == null || s.waitStatus > 0) {              s = null;              for (Node t = tail; t != null && t != node; t = t.prev)                  if (t.waitStatus <= 0)                      s = t;          }          if (s != null)              LockSupport.unpark(s.thread);      }
  • newCondition()
    ReentrantLock可以實現綁定多個等待條件,這個功能是在newCondition()方法中實現的,每次調用newCondition()方法時,都會產生一個新的ConditionObject對象,這是AQS中的一個內部類,代碼很長,這裡就不詳細討論了。來簡單看看該方法的源碼:
    public Condition newCondition() {          return sync.newCondition();      }      final ConditionObject newCondition() {          return new ConditionObject();      }

3.總結

在多線程環境中,ReentrantLock的非公平鎖要比公平鎖擁有更高的性能,因為非公平鎖避免了線程掛起產生的上下文切換的開銷,但是公平鎖能夠避免線程飢餓問題,因此各有各的使用場景。從源碼來看,J.U.C包下的很多類都依賴AQS類,因此非常有必要搞懂AQS。提到ReentrantLock,總免不了與synchronized進行對比。synchronized也是可重入的,並且在JDK 1.6以後,synchronized的性能已經得到了很大的提升,因此選擇使用ReentrantLock一般是考慮使用它的三個優勢:可中斷、可實現公平鎖、可綁定多個條件,這些優勢是synchronized不具備的。