Java讀源碼之ReentrantLock

前言

ReentrantLock 可重入鎖,應該是除了 synchronized 關鍵字外用的最多的執行緒同步手段了,雖然JVM維護者瘋狂優化 synchronized 使其已經擁有了很好的性能。但 ReentrantLock 仍有其存在價值,例如可以感知執行緒中斷,公平鎖模式,可以指定超時時間的搶鎖等更細粒度的控制都是目前的 synchronized 做不到的。

如果不是很了解 Java 中執行緒的一些基本概念,可以看之前這篇:

Java讀源碼之Thread

案例

用一個最簡單的案例引出我們的主角

public class ReentrantLockDemo {        // 默認是非公平鎖和 synchronized 一樣      private static ReentrantLock reentrantLock = new ReentrantLock();        public void printThreadInfo(int num) {          reentrantLock.lock();          try {              System.out.println(num + " : " + Thread.currentThread().getName());              System.out.println(num + " : " + Thread.currentThread().toString());          } finally {              reentrantLock.unlock();          }      }        public static void main(String[] args) {          ExecutorService executorService = Executors.newCachedThreadPool();          IntStream.rangeClosed(0, 3)                  .forEach(num -> executorService                          .execute(() -> new ReentrantLockDemo().printThreadInfo(num))                  );      }        /**       * 輸出:       * 0 : pool-1-thread-1       * 0 : Thread[pool-1-thread-1,5,main]       * 3 : pool-1-thread-4       * 3 : Thread[pool-1-thread-4,5,main]       * 1 : pool-1-thread-2       * 1 : Thread[pool-1-thread-2,5,main]       * 2 : pool-1-thread-3       * 2 : Thread[pool-1-thread-3,5,main]       */  

可以看到使用起來也很簡單,而且達到了同步的效果。廢話不多說一起來瞅一瞅 lock() 和 unlock() 兩個同步方法是怎麼實現的。

源碼分析

公平鎖與非公平鎖

公平鎖顧名思義。就是每個執行緒排隊搶佔鎖資源。而非公平鎖執行緒什麼時候能執行更多的看緣分,例如一個執行緒需要執行臨界區程式碼,不管之前有多少執行緒在等,直接去搶鎖,說白了就是插隊。對於 ReentrantLock 的實現,從構造器看出,當我們傳入 true 代表選擇了公平鎖模式

public ReentrantLock(boolean fair) {      sync = fair ? new FairSync() : new NonfairSync();  }  

為什麼先看公平鎖實現,而不是默認的非公平鎖,因為 synchronized 就是非公平鎖,1.7開始 synchronized 的實現改變了,並且基本借鑒了 ReentrantLock 的實現,加入了自旋,偏向鎖減少系統調用,所以如果需要非公平鎖且不需要特別精細的控制,完全沒有必要因為性能選擇 ReentrantLock 了。

AQS 結構

從案例中的 lock 方法進入

  • ReentrantLock.FairSync#lock
final void lock() {      // 要一把鎖,向誰要鎖?      acquire(1);  }  

在繼續深入之前讓我們先熟悉一下 AbstractQueuedSynchronizer(AKA :AQS) 的結構

  • 首先繼承了 AbstractOwnableSynchronizer ,主要屬性:
// 保存當前持有鎖的執行緒  private transient Thread exclusiveOwnerThread;  
  • AQS 自身主要屬性:
// 阻塞隊列的頭  private transient volatile Node head;    // 阻塞隊列的尾  private transient volatile Node tail;    // 同步器的狀態  private volatile int state;  

從 head 和 tail 可以猜想到,AQS 應該是用一個鏈表作為等待隊列,給等待的執行緒排隊, status 欄位默認是0,一旦鎖被某個執行緒佔有就 +1,那為啥要用int呢? 如果當前持有鎖的這個執行緒(exclusiveOwnerThread)還要再來把鎖,那狀態還可以繼續 +1,也就實現了可重入。

  • 上面的 Node 節點長啥樣呢,不要被注釋中 CLH 鎖高大上的名稱嚇到,其實就是雙向鏈表,主要屬性:
// 標識次節點是共享模式  static final Node SHARED = new Node();  // 標識次節點是獨佔模式  static final Node EXCLUSIVE = null;  // 節點裡裝著排隊的執行緒  volatile Thread thread;  // 節點裡裝的執行緒放棄了,不搶鎖了,可能超時了,可能中斷了  static final int CANCELLED =  1;  // 下一個節點裡的執行緒等待被通知出隊  static final int SIGNAL    = -1;  // 節點裡裝的執行緒在等待執行條件,結合 Condition 使用  static final int CONDITION = -2;  // 節點狀態需要被傳播到下一個節點,主要用在共享模式  static final int PROPAGATE = -3;  // 標識節點的等待狀態,初始0,取值是上面的 -3 ~ 1  volatile int waitStatus;  // 前一個節點  volatile Node prev;  // 後一個節點  volatile Node next;  // 指向下一個等待條件 Condition  Node nextWaiter;  

去掉一些普通情況不會涉及的屬性,如果有四個執行緒競爭,結構如下圖所示:

可以看到就是一個標準的頭節點為空的雙鏈表,為什麼頭節點是空?

公平鎖加鎖

  • AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {      // 如果嘗試拿鎖沒成功,那就進等待隊列      if (!tryAcquire(arg) &&          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))          // 檢測到執行緒被中斷了,因為重置了中斷訊號但沒做處理,再設置下中斷位,讓用戶去處理,中斷標準操作          selfInterrupt();  }    static void selfInterrupt() {      Thread.currentThread().interrupt();  }  
  • ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {      final Thread current = Thread.currentThread();      // 取AQS的 state 值      int c = getState();      // 當前沒有執行緒持有鎖      if (c == 0) {          // 如果沒有其他執行緒在排隊(公平)          if (!hasQueuedPredecessors() &&              // 這裡可能存在競爭 CAS 試著去搶一次鎖              compareAndSetState(0, acquires)) {              // 搶到鎖了,把鎖持有者改成自己,其他執行緒往後稍稍              setExclusiveOwnerThread(current);              return true;          }      }      // 鎖已經被持有了,但如果鎖主人就是自己,那歡迎光臨(可重入)      else if (current == getExclusiveOwnerThread()) {          int nextc = c + acquires;          if (nextc < 0)              throw new Error("Maximum lock count exceeded");          // 因為其他執行緒進不來,這裡不存在競爭,直接改鎖狀態          setState(nextc);          return true;      }      return false;  }  
  • AbstractQueuedSynchronizer#hasQueuedPredecessors
// 返回 false 代表不需要排隊,true 代表要排隊  public final boolean hasQueuedPredecessors() {      Node t = tail;      Node h = head;      Node s;      // h == t 頭等於尾只可能是剛初始的狀態或者已經沒有節點等待了      // h.next == null ? 下面介紹進隊的過程中,如果其他執行緒與此同時 tryAcquire 成功了,會把之前的head.next置為空,說明被捷足先登了,差一點可惜      // 如果到最後一個判斷了,也就是隊列中至少有一個等待節點,直接看第一個等待節點是不是自己,如果不是自己就乖乖排隊去      return h != t &&          ((s = h.next) == null || s.thread != Thread.currentThread());  }  
  • AbstractQueuedSynchronizer#addWaiter

tryAcquire如果沒有拿到鎖,就需要進等待隊列了,變成一個 Node 實例

// 這裡 mode 為獨佔模式  private Node addWaiter(Node mode) {      Node node = new Node(Thread.currentThread(), mode);      Node pred = tail;      // 如果尾節點不為空,說明等待隊列已經初始化過      if (pred != null) {          node.prev = pred;          // 嘗試把自己放到隊尾          if (compareAndSetTail(pred, node)) {              pred.next = node;              return node;          }      }      // 進來這裡說明,等待隊列沒有被初始化過,或者嘗試失敗了      enq(node);      return node;  }  
  • AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {      for (;;) {          Node t = tail;          // 如果尾節點是空,說明隊列沒有初始化          if (t == null) {              // 初始化一個空節點(延遲載入),head ,tail都指向它              if (compareAndSetHead(new Node()))                  tail = head;          }          // 一直嘗試把自己塞到隊尾(自旋)          else {              node.prev = t;              if (compareAndSetTail(t, node)) {                  t.next = node;                  return t;              }          }      }  }  
  • AbstractQueuedSynchronizer#acquireQueued

addWaiter方法已經把等待執行緒包裝成節點放到等待隊列了,為啥要返回中斷標識呢?主要是為了給一些需要處理中斷的方式復用,例如 ReentrantLock#lockInterruptibly,以及帶超時的鎖

final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {          boolean interrupted = false;          // 這邊邏輯開始繞起來了          for (;;) {              // 拿前一個節點              final Node p = node.predecessor();              // 前一個節點是head,說明自己排在第一個              if (p == head &&                  // 在讓出cpu前再試一次,此時可能鎖持有者已經讓位了                  tryAcquire(arg)) {                  // 搶到鎖了                  setHead(node);                  // 把之前沒用的頭節點釋放                  p.next = null; // help GC                  failed = false;                  return interrupted;              }              // 兩次嘗試都失敗了,只能準備被掛起,讓出cpu了(調了內核,重量級)              if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;          }      } finally {          // 普通的鎖不處理中斷異常,不會進這個方法          if (failed)              cancelAcquire(node);      }  }    private void setHead(Node node) {      // 把頭節點設為自己      head = node;      // 因為已經搶到鎖了,不需要記錄這個執行緒在等待了,保持了頭節點中執行緒永遠為 null      node.thread = null;      node.prev = null;  }  
  • AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {      int ws = pred.waitStatus;      if (ws == Node.SIGNAL)          // 已經告訴前一個節點自己需要被通知了          return true;      if (ws > 0) {          // 只有 CANCELLED 這個狀態大於0,如果前面的節點不排隊了,就一直找到一個沒 CANCELLED 的          do {              node.prev = pred = pred.prev;          } while (pred.waitStatus > 0);          pred.next = node;      }      // 進到這裡,只剩下PROPAGATE(共享鎖時候才有) CONDITION(本文不涉及) 和 未賦值狀態也就是0,      else {          // 這裡把 默認狀態0 置為 -1,也就代表著後面有執行緒在等著被喚醒了          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);      }      // 返回false,就暫時不會讓執行緒掛起,繼續自旋,直到返回true      return false;  }  
  • AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {      // 掛起,標準用法this充當blocker      LockSupport.park(this);      // 一旦恢復,返回執行緒在掛起階段是否被中斷,此方法會重置中斷位      return Thread.interrupted();  }  

到這裡加鎖流程就介紹差不多了,用一個最簡單流程的圖來總結一下:

公平鎖解鎖

  • AbstractQueuedSynchronizer#release
public final boolean release(int arg) {      // 嘗試釋放鎖      if (tryRelease(arg)) {          Node h = head;          // 如果等待隊列已經被初始化過,並且後面有節點等待操作          if (h != null && h.waitStatus != 0)              // 恢復掛起的執行緒              unparkSuccessor(h);          return true;      }      return false;  }  
  • ReentrantLock.FairSync#tryRelease
protected final boolean tryRelease(int releases) {      int c = getState() - releases;      // 能執行釋放鎖的肯定是鎖的持有者,除非虛擬機魔怔了      if (Thread.currentThread() != getExclusiveOwnerThread())          throw new IllegalMonitorStateException();      boolean free = false;      // 考慮可重入      if (c == 0) {          free = true;          // 鎖現在沒有持有者了          setExclusiveOwnerThread(null);      }      setState(c);      return free;  }  
  • ReentrantLock.FairSync#unparkSuccessor
// node 是頭節點  private void unparkSuccessor(Node node) {      int ws = node.waitStatus;      // 如果狀態不是 CANCELED,就把狀態置為初始狀態      if (ws < 0)          compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;      // s == null 這個條件成立主要是在共享模式下自旋釋放。      if (s == null || s.waitStatus > 0) {          // 把 CANCELED 狀態的節點置為空          s = null;          // 因為 head 這條路已經斷了,從尾巴開始找到第一個排隊的節點,然後把隊列接上          for (Node t = tail; t != null && t != node; t = t.prev)              if (t.waitStatus <= 0)                  s = t;      }      if (s != null)          // 把第一個排隊的節點中的執行緒喚醒,          LockSupport.unpark(s.thread);  }  

執行緒從加鎖程式碼里介紹的 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方法中醒來,繼續自旋拿鎖。如果此時後面還有人排隊就一定能拿到鎖了。如圖所示:

非公平鎖加鎖

  • ReentrantLock.NonfairSync#lock
final void lock() {      // 不管三七二十一,直接搶鎖,如果運氣好,鎖正好被釋放了,就不排隊了      if (compareAndSetState(0, 1))          setExclusiveOwnerThread(Thread.currentThread());      else          // 和上面介紹的公平鎖一樣,只是 tryAcquire 實現不一樣          acquire(1);  }  
  • ReentrantLock.Sync#nonfairTryAcquire

上面公平鎖我們已經知道,執行緒真正掛起前會嘗試兩次,由於不考慮別人有沒有入隊,實現非常簡單

final boolean nonfairTryAcquire(int acquires) {      final Thread current = Thread.currentThread();      int c = getState();      // 如果沒有執行緒持有鎖,直接搶鎖      if (c == 0) {          if (compareAndSetState(0, acquires)) {              setExclusiveOwnerThread(current);              return true;          }      }      // 如果是重入,狀態累加      else if (current == getExclusiveOwnerThread()) {          int nextc = c + acquires;          if (nextc < 0)              throw new Error("Maximum lock count exceeded");          setState(nextc);          return true;      }      return false;  }  

非公平鎖解鎖

因為都是獨佔鎖模式,解鎖和公平鎖邏輯一樣。

總結

至此,總算看完了 ReentrantLock 常規的加鎖解鎖源碼,好好體會下 AQS 的結構,還是能看懂的,且很有收穫,總之 Doug Lea 大神牛B。AbstractQueuedSynchronizer#acquireQueued

本文還是挖了很多坑的:

  • 帶超時的鎖是如何實現的?
  • 檢測中斷的鎖是如何實現的?
  • 各種 Condition 是如何實現的?
  • 鎖的共享模式是如何實現的?

以後有時間再一探究竟吧。