【原創】Java並發編程系列14 | AQS源碼分析

  • 2020 年 3 月 13 日
  • 筆記

AbstractQueuedSynchronizer是Java並發包java.util.concurrent的核心基礎組件,是實現Lock的基礎。

AQS實現了對同步狀態的管理,以及對阻塞執行緒進行排隊、等待通知等,本文將從源碼角度深入理解AQS的實現原理。

建議:本文涉及大量源碼,在源碼中加了很多詳細的注釋,用電腦閱讀會更方便。

1. AQS類結構

屬性

// 屬性  private transient volatile Node head;// 同步隊列頭節點  private transient volatile Node tail;// 同步隊列尾節點  private volatile int state;// 當前鎖的狀態:0代表沒有被佔用,大於0代表鎖已被執行緒佔用(鎖可以重入,每次重入都+1)  private transient Thread exclusiveOwnerThread; // 繼承自AbstractOwnableSynchronizer 持有當前鎖的執行緒

方法

// 鎖狀態  getState()// 返回同步狀態的當前值;  setState(int newState)// 設置當前同步狀態;  compareAndSetState(int expect, int update)// 使用CAS設置當前狀態,保證狀態設置的原子性;    // 獨佔鎖  acquire(int arg)// 獨佔式獲取同步狀態,如果獲取失敗則插入同步隊列進行等待;  acquireInterruptibly(int arg)// 與acquire(int arg)相同,但是該方法響應中斷;  tryAcquireNanos(int arg,long nanos)// 在acquireInterruptibly基礎上增加了超時等待功能,在超時時間內沒有獲得同步狀態返回false;  release(int arg)// 獨佔式釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中頭節點的下一個節點包含的執行緒喚醒;    // 共享鎖  acquireShared(int arg)// 共享式獲取同步狀態,與獨佔式的區別在於同一時刻有多個執行緒獲取同步狀態;  acquireSharedInterruptibly(int arg)// 在acquireShared方法基礎上增加了能響應中斷的功能;  tryAcquireSharedNanos(int arg, long nanosTimeout)// 在acquireSharedInterruptibly基礎上增加了超時等待的功能;  releaseShared(int arg)// 共享式釋放同步狀態;    // AQS使用模板方法設計模式  // 模板方法,需要子類實現獲取鎖/釋放鎖的方法  tryAcquire(int arg)// 獨佔式獲取同步狀態;  tryRelease(int arg)// 獨佔式釋放同步狀態;  tryAcquireShared(int arg)// 共享式獲取同步狀態;  tryReleaseShared(int arg)// 共享式釋放同步狀態;

內部類

// 同步隊列的節點類  static final class Node {}

2. 同步隊列

AQS通過內置的FIFO同步隊列來完成資源獲取執行緒的排隊工作。

如果當前執行緒獲取鎖失敗時,AQS會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)並將其加入同步隊列,同時會park當前執行緒;當同步狀態釋放時,則會把節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

隊列結構

同步隊列由雙向鏈表實現,AQS持有頭尾指針(head/tail屬性)來管理同步隊列。

節點的數據結構,即AQS的靜態內部類Node,包括節點對應的執行緒、節點的等待狀態等資訊。

節點類:
static final class Node {      volatile Node prev;// 當前節點/執行緒的前驅節點      volatile Node next;// 當前節點/執行緒的後繼節點      volatile Thread thread;// 每一個節點對應一個執行緒        volatile int waitStatus;// 節點狀態      static final int CANCELLED =  1;// 節點狀態:此執行緒取消了爭搶這個鎖      static final int SIGNAL = -1;// 節點狀態:當前node的後繼節點對應的執行緒需要被喚醒(表示後繼節點的狀態)      static final int CONDITION = -2;// 節點狀態:當前節點進入等待隊列中      static final int PROPAGATE = -3;// 節點狀態:表示下一次共享式同步狀態獲取將會無條件傳播下去        Node nextWaiter;// 共享模式/獨佔模式      static final Node SHARED = new Node();// 共享模式      static final Node EXCLUSIVE = null;// 獨佔模式  }
入隊操作
/**   * 1.執行緒搶鎖失敗後,封裝成node加入隊列   * 2.隊列有tail,可直接入隊。   *   2.1入隊時,通過CAS將node置為tail。CAS操作失敗,說明被其它執行緒搶先入隊了,node需要通過enq()方法入隊。   * 3.隊列沒有tail,說明隊列是空的,node通過enq()方法入隊,enq()會初始化head和tail。   */  private Node addWaiter(Node mode) {      Node node = new Node(Thread.currentThread(), mode);// 執行緒搶鎖失敗後,封裝成node加入隊列      Node pred = tail;      if (pred != null) {// 如果有tail,node加入隊尾          node.prev = pred;          if (compareAndSetTail(pred, node)) {// 通過CAS將node置為tail。CAS操作失敗,說明被其它執行緒搶先入隊了,node需要通過enq()方法入隊。              pred.next = node;              return node;          }      }      enq(node);// 如果沒有tail,node通過enq()方法入隊。      return node;  }    /**   * 1.通過自旋的方式將node入隊,只有node入隊成功才返回,否則一直循環。   * 2.如果隊列為空,初始化head/tail,初始化之後再次循環到else分支,將node入隊。   * 3.node入隊時,通過CAS將node置為tail。CAS操作失敗,說明被其它執行緒搶先入隊了,自旋,直到成功。   */  private Node enq(final Node node) {      for (;;) {// 自旋:循環入列,直到成功          Node t = tail;          if (t == null) {              // 初始化head/tail,初始化之後再次循環到else分支,將node入隊              if (compareAndSetHead(new Node()))                  tail = head;          } else {              // node入隊              node.prev = t;              if (compareAndSetTail(t, node)) {// 通過CAS將node置為tail。操作失敗,說明被其它執行緒搶先入隊了,自旋,直到成功。                  t.next = node;                  return t;              }          }      }  }

3. 獲取鎖

以獨佔鎖為例詳細講解獲取鎖及排隊等待的過程。直接在程式碼中加了詳細的注釋講解,耐心看一定可以看懂。

/**   * 1.當前執行緒通過tryAcquire()方法搶鎖。   * 2.執行緒搶到鎖,tryAcquire()返回true,結束。   * 3.執行緒沒有搶到鎖,addWaiter()方法將當前執行緒封裝成node加入同步隊列,並將node交由acquireQueued()處理。   */  public final void acquire(int arg) {      if (!tryAcquire(arg) && // 子類的搶鎖操作,下文有解釋          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 子類搶鎖失敗進入隊列中,重點方法,下文詳細講解          selfInterrupt();  }    /**   * 需要子類實現的搶鎖的方法   * 目前可以理解為通過CAS修改state的值,成功即為搶到鎖,返回true;否則返回false。   * 之後重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock中會詳細講解。   */  protected boolean tryAcquire(int arg) {      throw new UnsupportedOperationException();  }    /**   * 上文介紹過的入隊操作,執行緒搶鎖失敗,將當前執行緒封裝成node加入同步隊列,並返回node   * Node.EXCLUSIVE-表示獨佔鎖,先不用關注   */  addWaiter(Node.EXCLUSIVE)    /**   * 重點方法!!   * 1.只有head的後繼節點能去搶鎖,一旦搶到鎖舊head節點從隊列中刪除,next被置為新head節點。   * 2.如果node執行緒沒有獲取到鎖,將node執行緒掛起。   * 3.鎖釋放時head節點的後繼節點喚醒,喚醒之後繼續for循環搶鎖。   */  final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {          boolean interrupted = false;          for (;;) {// 注意這裡是循環              /*               * 1.node的前置節點是head時,可以調用tryAcquire()嘗試去獲取鎖,獲取鎖成功則將node置為head               * 注意:只有head的後繼節點能去搶鎖,一旦搶到鎖舊head節點從隊列中刪除,next被置為新head節點               * 2.node執行緒沒有獲取到鎖,繼續執行下面另一個if的程式碼               *  此時有兩種情況:1)node不是head的後繼節點,沒有資格搶鎖;2)node是head的後繼節點但搶鎖沒成功               */              final Node p = node.predecessor();              if (p == head && tryAcquire(arg)) {                  setHead(node);                  p.next = null; // help GC                  failed = false;                  return interrupted;              }                /*               * shouldParkAfterFailedAcquire(p, node):通過前置節點pred的狀態waitStatus 來判斷是否可以將node節點執行緒掛起               * parkAndCheckInterrupt():將當前執行緒掛起               * 1.如果node前置節點p.waitStatus==Node.SIGNAL(-1),直接將當前執行緒掛起,等待喚醒。               *      鎖釋放時會將head節點的後繼節點喚醒,喚醒之後繼續for循環搶鎖。               * 2.如果node前置節點p.waitStatus<=0但是不等於-1,               *      1)shouldParkAfterFailedAcquire(p, node)會將p.waitStatus置為-1,並返回false;               *      2)進入一下次for循環,先嘗試搶鎖,沒獲取到鎖則又到這裡,此時p.waitStatus==-1,就會掛起當前執行緒。               *  3.如果node前置節點p.waitStatus>0,               *      1)shouldParkAfterFailedAcquire(p, node)為node找一個waitStatus<=0的前置節點,並返回false;               *      2)繼續for循環               */              if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;          }      } finally {          if (failed)              cancelAcquire(node);      }  }    /**   * 通過前置節點pred的狀態waitStatus 來判斷是否可以將node節點執行緒掛起   * pred.waitStatus==Node.SIGNAL(-1)時,返回true表示可以掛起node執行緒,否則返回false   * @param pred node的前置節點   * @param node 當前執行緒節點   */  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {      int ws = pred.waitStatus;      if (ws == Node.SIGNAL)          return true;      if (ws > 0) {          /*           * waitStatus>0 ,表示節點取消了排隊           * 這裡檢測一下,將不需要排隊的執行緒從隊列中刪除(因為同步隊列中保存的是等鎖的執行緒)           * 為node找一個waitStatus<=0的前置節點pred           */          do {              node.prev = pred = pred.prev;          } while (pred.waitStatus > 0);          pred.next = node;      } else {          // 此時pred.waitStatus<=0但是不等於-1,那麼將pred.waitStatus置為Node.SIGNAL(-1)          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);      }      return false;  }    /**   * 將當前執行緒掛起   * LockSupport.park()掛起當前執行緒;LockSupport.unpark(thread)喚醒執行緒thread   */  private final boolean parkAndCheckInterrupt() {      LockSupport.park(this);// 將當前執行緒掛起      return Thread.interrupted();  }

AQS獲取鎖

4. 釋放鎖

/**   * 釋放鎖之後,喚醒head的後繼節點next。   * 回顧上文講的acquireQueued()方法,next節點會進入for循環的下一次循環去搶鎖   */  public final boolean release(int arg) {      if (tryRelease(arg)) {// 子類實現的釋放鎖的方法,下文有講解          Node h = head;          if (h != null && h.waitStatus != 0)              unparkSuccessor(h);// 喚醒node節點(也就是head)的後繼節點,下文有講解          return true;      }      return false;  }    /**   * 需要子類實現的釋放鎖的方法,對應於tryAcquire()   * 目前可以理解為將state的值置為0。   * 之後重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock中會詳細講解。   */  protected boolean tryRelease(int arg) {      throw new UnsupportedOperationException();  }    /**   * 喚醒node節點(也就是head)的後繼節點   */  private void unparkSuccessor(Node node) {      int ws = node.waitStatus;      if (ws < 0)          compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;// 正常情況,s就是head.next節點      /*       * 有可能head.next取消了等待(waitStatus==1)       * 那麼就從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的去喚醒       */      if (s == null || s.waitStatus > 0) {          s = null;          for (Node t = tail; t != null && t != node; t = t.prev)              if (t.waitStatus <= 0)                  s = t;      }      if (s != null)          LockSupport.unpark(s.thread);// 喚醒s節點的執行緒去搶鎖  }

5. 回顧整個過程

  1. 執行緒1來獲取鎖,此時沒有競爭,直接獲取到鎖。AQS隊列為空。
  2. 執行緒2來獲取鎖,因為執行緒1佔用鎖,執行緒2需要做兩件事:

1)執行緒2構造成Node到AQS的同步隊列中排隊。此時初始化同步隊列。

2)執行緒2阻塞,等待被喚醒之後再去搶鎖。

  1. 執行緒3來獲取鎖,鎖被佔用,同樣做兩件事:排隊並阻塞。此時的同步隊列結構:
  1. 執行緒1執行完同步程式碼之後釋放鎖,喚醒head的後繼節點(執行緒2),執行緒2獲取鎖,並把執行緒2對應的Node置為head。
  2. 執行緒2執行完同步程式碼之後釋放鎖,喚醒head的後繼節點(執行緒3),執行緒3獲取鎖,並把執行緒3對應的Node置為head。
  3. 執行緒3執行完同步程式碼之後釋放鎖,同步隊列中head之後沒有節點了,將head置為null即可。

總結

AQS結構:鎖狀態state、當前只有鎖的執行緒exclusiveOwnerThread以及雙向鏈表實現的同步隊列。

AQS使用模板方法設計模式,子類必須重寫AQS獲取鎖tryAcquire()和釋放鎖tryRelease()的方法,一般是對stateexclusiveOwnerThread的操作。

獲取鎖acquire()過程:

  1. 子類調用tryAcquire()嘗試獲取鎖,如果獲取鎖成功,完成。
  2. 如果獲取鎖失敗,當前執行緒會封裝成Node節點插入同步隊列中,並且將當前執行緒park()阻塞,等待被喚醒之後再搶鎖。

釋放鎖release()過程:當前執行緒調用子類的tryRelease()方法釋放鎖,釋放鎖成功後,會unpark(thread)喚醒head的後繼節點,讓其再去搶鎖。