AQS (AbstractQueuedSynchronizer)源碼導讀:鎖的獲得與釋放

  • 2020 年 2 月 18 日
  • 筆記

AQS是什麼?

AbstractQueuedSynchronizer簡稱AQS是一個抽象同步框架,可以用來實現一個依賴狀態的同步器。

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. 提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(訊號量,事件等)。該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。 This class supports either or both a default exclusive mode and a shared mode. 此類支援默認獨佔模式和共享模式。 Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:

即使這個類基於內部FIFO隊列,它也不會自動執行FIFO採集策略。 排他同步的核心形式如下:

Acquire:       while (!tryAcquire(arg)) {          enqueue thread if it is not already queued;          possibly block current thread;       }  ​   Release:       if (tryRelease(arg))          unblock the first queued thread;  ​  (共享模式類似,但可能包含級聯訊號。)

AQS 結構

    /**       * 當前持有獨佔鎖的執行緒       */      private transient Thread exclusiveOwnerThread;  ​      /**       * 等待隊列的頭結點,一般為當前持有鎖的執行緒 (volatile)       */      private transient volatile Node head;  ​      /**       * 等待隊列的尾結點,每次新結點進來,都是加到尾部,形成了鏈表 (volatile)       */      private transient volatile Node tail;  ​      /**       * 鎖的狀態,0 表示沒有被佔用,具體由子類實現 (volatile)       */      private volatile int state;  ​      // 等待隊列的結點      static final class Node {          // 標識節點當前在共享模式下          static final Node SHARED = new Node();          // 標識節點當前在獨佔模式下          static final Node EXCLUSIVE = null;            // 大於等於0  表明這個結點的執行緒取消了爭搶這個鎖,不需要去喚醒          /**           * The values are arranged numerically to simplify use.           * Non-negative values mean that a node doesn't need to           * signal. So, most code doesn't need to check for particular           * values, just for sign.           *           * The field is initialized to 0 for normal sync nodes, and           * CONDITION for condition nodes.  It is modified using CAS           * (or when possible, unconditional volatile writes).           */          volatile int waitStatus;  ​          // 前一個結點          volatile Node prev;  ​          // 下一個結點          volatile Node next;  ​          // 等待的執行緒          volatile Thread thread;            ...      }

CLH隊列 — Craig, Landin, and Hagersten lock queue

CLH是一個非阻塞的 FIFO 隊列。也就是說往裡面插入或移除一個節點的時候,在並發條件下不會阻塞,而是通過自旋鎖和 CAS 保證節點插入和移除的原子性。

源碼導讀:ReentrantLock 公平鎖

 public static void main(String[] args) {      ReentrantLock lock = new ReentrantLock(true);        lock.lock();        lock.unlock();    }

我們先看一下lock, 然後是 unlock

獲得鎖的流程和源碼解讀

// ReentrantLock.FairSync  	final void lock() {              acquire(1);       }    // AQS  	public final void acquire(int arg) {          // 1) 當前執行緒嘗試獲得鎖,如果成功,結束          // 2)如果獲得鎖失敗,當前執行緒加入等待隊列,加入後不斷循環監視上一個結點狀態          //     如果上一個結點是頭結點head ,-嘗試獲得鎖,獲得成功,跳出循環          //     否則,根據上一個結點的waitStatus,進行調整,包括掛起當前執行緒,或調整上一個結點為非取消狀態的結點          // 3)最後當前執行緒發起中斷 Thread.currentThread().interrupt()          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();      }    // ReentrantLock.FairSync  // 當前執行緒嘗試獲得鎖  // 返回true:1.沒有執行緒在等待鎖;2.重入鎖,執行緒本來就持有鎖,也就可以理所當然可以直接獲取  	protected final boolean tryAcquire(int acquires) {              final Thread current = Thread.currentThread();              int c = getState();              if (c == 0) {                  // 1)查看是否有等待已經在head後面等待鎖了,如果有當前執行緒放棄搶佔鎖                  // 2)如果沒有執行緒等待,CAS 嘗試修改 state, 如果成功,獲得鎖                  // 3)設置的當前執行緒為 當前持有獨佔鎖的執行緒                  if (!hasQueuedPredecessors() &&                      compareAndSetState(0, acquires)) {                      setExclusiveOwnerThread(current);                      return true;                  }              }          	// 4)如果state不為0, 說明為可重入,只需要判斷 當前執行緒 是否為 當前持有獨佔鎖的執行緒          	// 5)如果是,可重入,state增加              else if (current == getExclusiveOwnerThread()) {                  int nextc = c + acquires;                  if (nextc < 0)                      throw new Error("Maximum lock count exceeded");                  setState(nextc);                  return true;              }          	//6) 其他情況,獲得鎖失敗              return false;          }    // AQS  // 構造結點,採用CAS 加入等待隊列的尾部  	private Node addWaiter(Node mode) {          Node node = new Node(Thread.currentThread(), mode);          Node pred = tail;          //1) 如果等待隊列不為空,則CAS 將當前執行緒加入等待隊列的尾部          if (pred != null) {              node.prev = pred;              if (compareAndSetTail(pred, node)) {                  pred.next = node;                  return node;              }          }          // 2)如果等待隊列為空,則CAS 新建初始化CLH隊列,併當前執行緒加入等待隊列的尾部          enq(node);          return node;      }    // AQS  // 如果獲得鎖失敗,當前執行緒加入等待隊列      final boolean acquireQueued(final Node node, int arg) {          boolean failed = true;          try {              boolean interrupted = false;              // 注意,這是一個循環,只有獲得鎖,或拋異常才會退出              // 如果上一個節點是頭結點 head,則嘗試獲得鎖              // 否則,如果當前執行緒需要掛起,則掛起等待鎖的釋放              for (;;) {                  // 1)查看當前結點的上一個結點,如果是頭結點head,嘗試獲得鎖                  final Node p = node.predecessor();                  if (p == head && tryAcquire(arg)) {                      setHead(node);                      p.next = null; // help GC                      failed = false;                      return interrupted;                  }                  // 2)如果當前結點的上一個結點不是頭結點head,或獲得鎖失敗,則判斷一下是否需求掛起當前執行緒?                  // 3)如果需要掛起,則 park 掛起當前執行緒                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      interrupted = true;              }          } finally {              // 4)如果獲得鎖失敗,並且拋異常,則當前執行緒取消搶佔              if (failed)                  cancelAcquire(node);          }      }    // AQS      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {          int ws = pred.waitStatus;          // 1)如果上一個結點的狀態是 -1, 則當前結點 需要掛起          if (ws == Node.SIGNAL)              return true;          // 2)如果上一個結點的狀態 大於0,表示取消搶佔了, 那麼循環往前找到一個結點的狀態 <=0,然後將當前結點的上一個結點 設置為 找到的結點          if (ws > 0) {              do {                  node.prev = pred = pred.prev;              } while (pred.waitStatus > 0);              pred.next = node;          }          // 3)如果上一個結點<=0且不等於-1, 那麼結點狀態為 0(加入等待隊列的初始狀態), -2,-3,          //		此時需要 用CAS將上一個節點的waitStatus設置為Node.SIGNAL(也就是-1)          else {              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);          }          return false;      }    // AQS  // 掛起當前執行緒,並測試返回當前執行緒是否中斷狀態  	private final boolean parkAndCheckInterrupt() {          LockSupport.park(this);          return Thread.interrupted();      }

釋放鎖的流程和源碼解讀

// ReentrantLock  	final void unlock() {            sync.release(1);       }    // AQS  	public final boolean release(int arg) {          // 1)釋放鎖,嘗試修改state的值          if (tryRelease(arg)) {              Node h = head;              // 2)如果等待隊列有執行緒,則將頭部的結點狀態修正,並喚醒頭結點的下一個不是取消狀態的執行緒              if (h != null && h.waitStatus != 0)                  unparkSuccessor(h);              return true;          }          return false;      }    // ReentrantLock  // 判斷當前執行緒  是否為 當前持有獨佔鎖的執行緒  // 如果是,修改state的值,包括可重入鎖減一  // 如果不是,拋出異常  	protected final boolean tryRelease(int releases) {              int c = getState() - releases;              if (Thread.currentThread() != getExclusiveOwnerThread())                  throw new IllegalMonitorStateException();              boolean free = false;              if (c == 0) {                  free = true;                  setExclusiveOwnerThread(null);              }              setState(c);              return free;          }    // AQS      private void unparkSuccessor(Node node) {          /*           * If status is negative (i.e., possibly needing signal) try           * to clear in anticipation of signalling.  It is OK if this           * fails or if status is changed by waiting thread.           */          int ws = node.waitStatus;          // 如果head節點當前waitStatus<0, CAS 將其修改為0          if (ws < 0)              compareAndSetWaitStatus(node, ws, 0);            /*           * Thread to unpark is held in successor, which is normally           * just the next node.  But if cancelled or apparently null,           * traverse backwards from tail to find the actual           * non-cancelled successor.           */          // 喚醒後繼節點,但是有可能後繼節點取消了等待(waitStatus==1)     		// 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的          Node s = node.next;          if (s == null || s.waitStatus > 0) {              s = null;              for (Node t = tail; t != null && t != node; t = t.prev)                  if (t.waitStatus <= 0)                      s = t;          }          if (s != null)               // 喚醒執行緒              LockSupport.unpark(s.thread);      }

by 斯武丶風晴 https://my.oschina.net/langxSpirit