通過一個生活中的案例場景,揭開並發包底層AQS的神秘面紗

  • 2019 年 10 月 29 日
  • 筆記

本文導讀

  • 生活中案例場景介紹
  • 聯想到 AQS 到底是什麼
  • AQS 的設計初衷
  • 揭秘 AQS 底層實現
  • 最後的總結

當你在學習某一個技能的時候,是否曾有過這樣的感覺,就是同一個技能點學完了之後,過了一段時間,如果你沒有任何總結,或者是不經常回顧,遺忘的速度是非常之快的。

忘記了之後,然後再重新學,因為已經間隔了一段時間,再次學習又當做了新的知識點來學。這種狀態如此反覆,浪費了相同的時間,但學習效果卻收效甚微。

每當遇到這種情況,我們可以停下來,思考一下。對於某一個技術知識點理解起來不是那麼好懂的時候,或者是學習起來有點吃力的時候,咱們可以嘗試找找生活中的例子來聯想下。

因為技術源於生活。

找到一個合適的生活案例,然後結合你自己做筆記總結和動手實踐的過程。定期的去回顧一下,慢慢的就會理解的更加透徹。

1、生活中案例場景介紹

今天我們就舉一個生活中的例子來理解下並發底層的AQS。

大家如果去過某些大醫院的話,就能知道,由於互聯網的快速發展,醫院的挂號、交費、取葯的流程都是比較方便的,交費也可以使用支付寶、微信支付了,而不用帶現金了。

醫生開完單子,交費完成 ,單子上都會有一個長條二維碼,可以直接在取葯的地方自助掃碼,叫號系統自動分配取葯窗口,然後你在關注下指定窗口等待着叫號就可以了,叫到你的時候再過去取葯,而不需要一直在等待着。

我們用一張圖來直觀的感受下:

file

這裏面涉及到了幾個角色:

1)藥房,提供取葯窗口的,內部有自助取葯機或人工取葯

2)取葯叫號系統,當用戶掃碼藥單後,自動錄入到該系統中

3)取藥用戶

接下來咱們細化下取葯流程。

當取藥用戶在自助機器上掃碼時,可以直觀的看下下面的流程圖:

取葯流程圖1

第一個用戶是程序猿,因為有多個自助掃碼機,他一看二維碼就知道咋回事了,所以第一個在自助機上掃碼完成,可以優先第一個去取葯窗口(State窗口)。

此時叫號系統的藥單隊列中還沒有其他人,程序猿掃碼後,就可以直接去窗口等待着取葯了。

接下來,本來是張大爺和王大媽看着先前程序猿的操作,也跟着在自助機上來回掃碼一把,由於不大懂掃哪裡,掃了半天也沒有個反應,老頭此時有點懵 : (。

後來熱心的程序猿看到了,給指點了一下 : ),幫助順利的掃碼完成。

再看下面這個流程圖:

取葯流程圖2

正好,張大爺和王大媽的取藥單,也被分配到跟程序猿同一個取葯窗口中 ,此時只能排隊了,按照他們的掃碼順序排隊,如上圖所示。

當程序猿取葯完成,叫號系統會自動呼叫下一位用戶,即隊列中的排在首節點的張大爺,自助取葯機收到消息會自動給張大爺取葯。此時,王大媽還是要等一會。後面的用戶 CCC 掃碼完成後,會繼續放到藥單隊列中,藥單隊列是按照 FIFO,也就是誰先掃碼誰就在前面,所以 CCC 排在王大媽的後面。

再看下面的流程圖:

取葯流程圖3

張大爺還在等待取葯過程中,王大媽也知道下一個可能就是她了,所以王大媽會時不時的,抬頭看看叫號窗口是否顯示了自己的名字。
此時,王大媽可以稍微在等待區休息一會,等待系統叫號就可以了。

2、聯想到 AQS 到底是什麼

其實,上面的場景介紹中,在醫院裏是很常見的。那麼這個場景對應的,我們可以聯想到 Java 中的並發編程。

如果沒有中間的叫號系統來做控制,如果醫院沒有限制,很多用戶要麼一擁而上沒有秩序的亂擠,要麼就有秩序的都在窗口站着排成長隊等待着。

所以中間的叫號系統解決了很多問題,解決了很多取藥用戶的有序性、安全性,而且不需要用戶一直等着,用戶線程無阻塞,當收到系統通知信號後,用戶再繼續執行取葯動作。

這個生活中的例子,可以很好的聯想到 Java 中我們常用的,並發包的底層技術:AQS (AbstractQueuedSynchronizer)隊列同步器(簡稱同步器)。

就像我們舉得例子中的提到的幾個角色,有很多用戶(理解為用戶線程),有共享資源(取葯窗口)。在用戶線程和共享資源之間,是通過中間系統來協調控制的,這裏面就會涉及的概念。

是用來控制多個線程訪問共享資源的方式。一個鎖能防止多個線程對共享資源的同時訪問,有些鎖也允許多個線程並發訪問共享資源,比如讀寫鎖。

在 Java 中經常使用的鎖是 synchronized,synchronized 會隱式的獲得鎖,但它必須是先獲得鎖再釋放鎖。這種方式簡化了同步的管理,但擴展性不如 Lock 顯示的獲得鎖和釋放鎖更加靈活。

synchronized 和 Lock 鎖之間的區別:

synchronized和Lock鎖區別

從性能上來講,當並發量高、競爭激烈的場景下,Lock 鎖會較 synchronized 性能上表現的
更穩定些。反之,當並發量不高的情況下,synchronized 有分級鎖的優勢,因此兩者性能差不多,synchronized 相對來說使用上更加簡單,不用考慮手工釋放鎖。

直觀感受下兩者的性能對比:

性能對比

Lock 顯示的鎖使用,因為使用上更加靈活,這得益於其底層基礎同步框架的實現機制,它就是 AQS。

如下圖所示:

多線程訪問共享資源

上述圖中列出了多個並發包中的類,每一個並發工具類解決的問題場景不同,但是其底層同步框架基本都是使用的 AQS 來實現的。

3、AQS 的設計初衷

Java 大佬考慮並發底層使用 AQS 的設計思想初衷,就是為了能夠抽象出來統一的同步協調處理器,設計好頂層結構,作為並發包構建的基本骨架,該骨架里封裝了多線程的入隊/出隊、線程阻塞/喚醒等一系列複雜的操作。Java SDK 中面向開發者針對不同需求場景提供了多個並發包工具。

儘管,提供的這些並發包的實現方式是不一樣的,但都是基於頂層抽象出來的 AQS 所定義的統一接口基礎上,然後部分定製邏輯延遲到子類去自行實現。同時,部分定義的方法中是按照既定的順序執行的,由此,我們也能夠想到,AQS 使用了模板方法模式。

在上一節圖中提到的幾個並發包中,我們來簡單介紹下實現場景。

多線程獨佔式並發工具:

1)ReentrantLock

可重入鎖,同一時刻僅允許一個線程訪問,所以可以稱作 獨佔鎖,線程可以重複獲取同一把鎖。

多線程共享式並發工具:

1)ReentrantReadWriteLock

可重入的讀寫鎖,允許多個讀線程同時進行,但不允許寫-讀、寫-寫線程同時訪問。

適用於讀多寫少的場景下。

2)CountDownLatch

主要用來解決一個線程等待 N 個線程的場景。

就像短跑運動員比賽,等到所有運動員全部都跑完才算競賽結束。

3)CycliBarrier

主要用於 N 個線程之間互相等待。

就像幾個驢友約好爬山,要等待所有驢友都到齊後才能統一出發。

4)Semaphore

限流場景使用,限定最多允許N個線程可以訪問某些資源。

就像車輛行駛到路口,必須要看紅綠燈指示,要等到綠燈才能通行。

基於上述這些並發包工具,我們可以根據多線程的不同使用場景去選擇。JDK 提供的這些並發包基本能夠滿足了大部分的開發者的使用需求。

4、揭秘 AQS 底層實現

在用戶取葯的這個例子中,我們可以把多個用戶掃碼取藥行為,聯想為多線程共用爭搶一個窗口的鎖,窗口就作為共享資源來看待。所以,哪個用戶先掃碼,這個用戶就優先有機會能提前取葯。

對應聯想到 AQS 內部結構,如下圖所示:

AQS內部結構模擬圖

我們根據用戶取葯的流程,對應畫出來的一個 AQS 底層的大致結構圖。經過舉例分析,多個用戶(線程)掃碼取葯會爭搶一把鎖(同一個取葯窗口,共享資源),所以用 Java 並發包里的 ReentrantLock 鎖的使用來描繪一下也更加貼切,因為 ReentrantLock 是一個獨佔鎖,同一個時刻只允許一個用戶執行。

結構圖中的 AQS 里,包含了幾個關鍵的屬性:

  • state 變量:表示同步狀態
  • exclusiveOwnerThread 變量:表示當前加鎖的線程
  • Node:CLH 隊列,是一個 FIFO 的雙端雙向鏈表隊列

啥是CLH?在 AQS 源碼中你能找到一段話,The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue,看上去像是三個人的名字,他們來發明的自旋算法,沒具體查資料。

AQS 隊列同步器主要包括:

  • 獨佔式同步狀態獲取和釋放,如:ReentrantLock
  • 共享式同步狀態獲取和釋放,如:Semaphore、CountDownLatch、CycliBarrier

接下來,我們就用獨佔式 ReentrantLock 可重入鎖來分析下 AQS 底層到底了做了哪些事情。

使用 ReentrantLock 顯示加鎖解鎖代碼很簡單,如下所示:

Lock lock = new ReentrantLock();  lock.lock();    // doSomething...    lock.unlock();

先來一張類圖:

類圖

列出了 Lock 接口和 ReentrantLock 實現類里的核心方法,其中 ReentrantLock 里的有個非常核心的屬性是 Sync ,它才是最最關鍵的組件,繼承了 AbstractQueuedSynchronizer 抽象類,作為子類實現了加鎖和解鎖。

再看一張全景類圖:

file

這張類圖中列出了 ReentrantLock 類里的 Sync 及其兩個子類 FairSync 公平鎖 和 NonfairSync 非公平鎖的核心方法,AQS 類里的核心屬性和方法。

AQS 中的 Node同步隊列關鍵屬性介紹:

waitStatus 等待狀態:

CANCELLED:值為1,等待的線程等待超時或被中斷,需從同步隊列中取消等待,節點進入該狀態不在變化。

SIGNAL:值為 -1,後繼節點的狀態處於等待狀態,而當前節點線程如果釋放了同步狀態或被取消,將會通知後繼節點,使得後繼節點的線程得以運行。

CONDITION:值為 -2,節點在等待隊列中,節點線程等待在 Condition 上,當其他線程對 Condition 調用了 signal() 方法後,該節點將會從等待隊列轉移到同步隊列中,獲取同步狀態。

PROPAGATE:值為 -3,表示下一次共享式同步狀態獲取將會無條件的被傳播下去。

INITAL:值為 0,初始狀態,當你創建新的節點時,默認就是這個狀態值。

雙向雙端隊列:

在 AQS 結構圖中已經有所描述,Node 是一個雙端雙向鏈表的隊列,雙端表示有 head (頭節點)和 tail(尾節點)。

雙向鏈表表示有 prev (指向前驅節點)和 next (指向後繼節點)兩個指針來標識 ,在上述 AbstractQueuedSynchronizer.Node 類圖中也能夠看得到。
此外,Node 中還有 thread 屬性表示當前的線程。

介紹完了類圖中的關鍵屬性和數據結構,我們來分析下,ReentrantLock 對象調用了 lock() 方法加鎖的過程。

找到 ReentrantLock 類里的 lock() 方法如下:

public void lock() {          sync.lock();  }

看到沒,sync 變量就是 Sync 剛提到的 AQS 的子類,調用了 sync 的 lock() 方法。

當我們點擊進去 sync#lock() 方法時,發現是個抽象方法,可以找到兩個實現類,如下所示:

lock方法

此時,如果不經常看源碼的同學,可能有點懵,到底是走那個方法?一種方式,你可以在 NonfairSync 和 FairSync 兩個類的 lock() 方法上都打上斷點,直接調試看到底是哪個類;另外,你可以猜測下,這個實現類應該是在對象初始化時創建的,所以你就直接去找構造方法。

public ReentrantLock() {          sync = new NonfairSync();  }

我們是通過默認構造方法創建的 ReentrantLock,跟進去看到的是創建的 NonfairSync,即默認創建的是非公平鎖方式。

看下 NonfairSync#lock() 方法實現:

final void lock() {      if (compareAndSetState(0, 1))              setExclusiveOwnerThread(Thread.currentThread());      else              acquire(1);  }

有兩條執行路徑:

1)直接通過 compareAndSetState(0, 1) 方法,使用了 CAS 可以無鎖化的保證一個數值修改的原子性,判斷下如果 state 變量是 0,說明沒有線程加鎖,可以把 state 設置為 1。設置成功後,

調用 setExclusiveOwnerThread(Thread.currentThread()) 方法,將當前線程設置為加鎖線程,即將 exclusiveOwnerThread 變量賦值為當前線程。

protected final boolean compareAndSetState(int expect, int update) {          // See below for intrinsics setup to support this          return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  }

compareAndSetState(int expect, int update) 底層調用了 UnSafe 類的 compareAndSwapInt(this, stateOffset, expect, update) 方法,該方法為 JDK 內部使用的API,進行的是指針操作,基於 CPU 指令實現的原子性的 CAS。

圖示如下:

線程1獲得鎖

2)如果 state 變量不是 0,說明有線程已經加鎖了,compareAndSetState(0, 1) 方法返回 false,執行 acquire(1) 方法。

當我們點擊 acquire(1) 方法後,就進入到了 AbstractQueuedSynchronizer 類裏面了。

acquire(int arg) 方法源碼:

public final void acquire(int arg) {      if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();  }

其他線程加入同步隊列,圖示如下:

線程加入AQS同步隊列

上述代碼完成如下幾個步驟:

1)首先調用 tryAcquire(int arg) 方法,保證線程安全的獲取同步狀態,如果同步狀態獲取失敗,進入步驟2)。

2)調用 addWaiter(Node node) 方法,參數為構建的獨佔式 Node.EXCLUSIVE 節點,將構建好的節點通過 CAS 無鎖化方式添加到同步隊列的尾部,並返回該節點。

3)最後調用 acquireQueued(Node node, int arg) 方法,使得該節點按「死循環」方式獲取同步狀態。如果節點獲取不到同步狀態,則會調用 LockSupport#park() 方法掛起,阻塞節點中的線程,被阻塞的線程等待喚醒,喚醒方式主要是前驅節點出隊或被中斷來實現的。

下面結合源碼具體剖析下上述的幾個步驟。

當調用 tryAcquire(int arg) 方法,注意 AQS 里的 方法是這樣的:

protected boolean tryAcquire(int arg) {      throw new UnsupportedOperationException();  }

1)tryAcquire(int arg) 嘗試獲取同步狀態分析:

這就是 AQS 提供的模板方法,由於子類自定義同步器去實現的。

所以,會跳轉到 NonfairSync 里的 tryAcquire(int arg) 方法:

protected final boolean tryAcquire(int acquires) {          return nonfairTryAcquire(acquires);  }

內部調用了 nonfairTryAcquire(int acquires) 方法,該方法是 Sync 父類的,如下所示:

final boolean nonfairTryAcquire(int acquires) {    // 獲取當前線程      final Thread current = Thread.currentThread();    //  獲取同步狀態      int c = getState();      // 如果同步狀態是0,沒人加鎖      if (c == 0) {        // 通過CAS方式設置同步狀態,嘗試將0修改為1          if (compareAndSetState(0, acquires)) {                  // 設置當前加鎖的線程,給exclusiveOwnerThread變量賦值                  setExclusiveOwnerThread(current);                  return true;          }      }      // 當前線程等於當前加鎖線程      else if (current == getExclusiveOwnerThread()) {          // 計算新的同步狀態值,nextc = 1 + 1 = 2          int nextc = c + acquires;          // 判斷下nextc,防止溢出          if (nextc < 0) // overflow                  throw new Error("Maximum lock count exceeded");          // 更新同步狀態值          setState(nextc);          return true;      }      return false;  }

即使是多線程訪問,同一時刻總是僅有一個線程能夠獲得同步狀態,就會走上述的 c == 0 里的邏輯。

如果是在同一個線程中進行了第二次調用 ReentrantLock#lock() 和 unlock() 方法呢?此時 c = 1,所以會走到 current == getExclusiveOwnerThread() 判斷當前線程是等於加鎖線程的,那麼就會計算 nextc 新的同步狀態 ,如果該值不會溢出,則調用 setState(int newState) 更新同步狀態值,state 同步狀態值變為 2。

** 2)addWaiter(Node node) 添加到同步隊列分析:**

addWaiter(Node node) 主要是將節點加入到同步隊列隊尾,源碼如下所示:

private Node addWaiter(Node mode) {    // mode傳進來的參數為Node.EXCLUSIVE,構建Node節點      Node node = new Node(Thread.currentThread(), mode);      // Try the fast path of enq; backup to full enq on failure      Node pred = tail;      // 尾節點不為空      if (pred != null) {              node.prev = pred;              // 1. 將當前節點作為尾節點添加到同步隊列              // 2. 原尾節點作為當前節點的前驅節點              if (compareAndSetTail(pred, node)) {                      pred.next = node;                      return node;              }      }      // 同步隊列為空,調用enq(node)方法      enq(node);      return node;  }

繼續看 enq(Node node) 方法源碼:

private Node enq(final Node node) {      for (;;) {          Node t = tail;          // 第一次循環,尾節點為空          if (t == null) { // Must initialize                  // 創建空Node節點作為Head頭節點                  if (compareAndSetHead(new Node()))                          tail = head;          } else {               // 第二次循環過來,只有一個節點,就是頭結點                  node.prev = t;                  // 將當前節點作為尾節點添加到同步隊列中                  if (compareAndSetTail(t, node)) {                       // 當前節點作為頭結點的後繼節點                          t.next = node;                          return t;                  }          }      }  }

也是使用了 CAS 無鎖化保證節點,可以正確的添加到同步隊列中。

第一次循環,尾節點為空,調用了 compareAndSetHead(new Node()) 方法,底層調用了 unsafe.compareAndSwapObject(this, headOffset, null, update) 如果 head 變量所在位置為 null,則更新為空 Node 節點。

第二次循環,尾節點不空,調用了 compareAndSetTail(t, node) 方法,底層調用了 unsafe.compareAndSwapObject(this, tailOffset, expect, update) ,此時 tail 變量所在位置為空 Node 節點,更新為當前節點,即 Node.EXCLUSIVE 獨佔式節點。

** 3)acquireQueued(Node node, int arg) 獲得同步狀態分析:**

節點加入到同步隊列後,就進入到了自旋的過程,每個節點都在不斷的觀察,是否可以獲得同步狀態,成功獲得同步狀態,就會從這個自旋過程中退出。如下所示是自旋過程的實現代碼。

acquireQueued() 方法源碼如下:

final boolean acquireQueued(final Node node, int arg) {      boolean failed = true;      try {          boolean interrupted = false;          for (;;) {                  // 獲得當前節點的前驅節點                  final Node p = node.predecessor();                  // 如果p是頭節點,則嘗試獲得同步狀態                  if (p == head && tryAcquire(arg)) {                          // 成功獲得同步狀態,把自己作為Head頭節點                          setHead(node);                          // 原頭節點從同步隊列移除,不需要CAS操作                          p.next = null; // help GC                          failed = false;                          return interrupted;                  }                  // 1. 如果不是頭節點,失敗獲得同步狀態,判斷下是否可以掛起                  // 2. 允許掛起 ,調用 LockSupport#park() 方法完成線程掛起,釋放鎖                  if (shouldParkAfterFailedAcquire(p, node) &&                          parkAndCheckInterrupt())                          interrupted = true;          }      } finally {          if (failed)                  cancelAcquire(node);      }  }

當前線程掛起過程,先調用 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,如下所示:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {      int ws = pred.waitStatus;      // 前驅節點狀態為SIGNAL,返回true      if (ws == Node.SIGNAL)              return true;      if (ws > 0) {          // 跳過 CACALLED 狀態的節點              do {                      node.prev = pred = pred.prev;              } while (pred.waitStatus > 0);              pred.next = node;      } else {              // 前驅節點的狀態小於0,則更新為SIGNAL狀態              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);      }      return false;  }

圖示如下:

AQS同步隊列節點自旋過程

線程1首先獲得了同步狀態,線程2、線程3發現 AQS 類里的 state 不為 0,所以都被添加到 AQS 的同步隊列尾部。

此時,同步隊列中的線程2和線程3的節點會進行自旋過程,線程2的前驅節點是頭節點,滿足這個條件,然後調用 tryAcquire(int arg) 方法嘗試獲得同步狀態。

當線程1業務處理完成,需要釋放同步狀態,是的後續節點線程能夠獲得同步狀態。示例中會使用 ReentrantLock#unlock() 方法來解鎖。

繼續來分析 unlock() 方法,如下代碼所示:

public void unlock() {      sync.release(1);  }

在 unlock() 方法中,調用的 Sync 類的 release(int arg),進入到該方法中。

public final boolean release(int arg) {    // 釋放同步狀態      if (tryRelease(arg)) {              Node h = head;              if (h != null && h.waitStatus != 0)                  // 喚醒後繼節點                      unparkSuccessor(h);              return true;      }      return false;  }

這個 release(int arg) 是在 AQS 類里的了,其內部會調用 tryRelease(int arg) 方法嘗試釋放同步狀態,如果成功釋放,獲得同步隊列里的 head 頭節點,頭節點不為空並且它的 waitStatus 狀態不為 0(即不為 INITAL 初始狀態),則會調用 unparkSuccessor(Node node) 喚醒後續節點。

當直接點擊進入 tryRelease(int arg) 方法,還是在 AQS 類里,如下所示:

protected boolean tryRelease(int arg) {          throw new UnsupportedOperationException();  }

AQS 類的該方法並沒有提供實現,跟 tryAcquire(int arg) 方法類似的,會由 Sync子類里的 tryRelease(int arg) 重寫該方法實現,如下所示:

protected final boolean tryRelease(int releases) {      // 獲得同步狀態為1,releases為1,所以c計算得到0      int c = getState() - releases;      // 當前線程不是加鎖線程,則拋出IllegalMonitorStateException異常      if (Thread.currentThread() != getExclusiveOwnerThread())              throw new IllegalMonitorStateException();      boolean free = false;      if (c == 0) {              free = true;              // 將加鎖線程變量設置為null              setExclusiveOwnerThread(null);      }      // 將state變量更新為計算得到的0,即更新同步狀態      setState(c);      return free;  }

如果釋放同步狀態成功,上述方法將會返回 true。完成的事情很簡單,就是將 state 變量的同步狀態更新一下,然後將加鎖線程 exclusiveOwnerThread 變量設置為 null。

然後,調用 unparkSuccessor(Node node) 方法通知後繼節點,源碼如下:

private void unparkSuccessor(Node node) {      int ws = node.waitStatus;    // 等待狀態小於0,則通過CAS更新等待狀態值為0      if (ws < 0)              compareAndSetWaitStatus(node, ws, 0);    // 獲得頭節點的後繼節點,即線程2      Node s = node.next;      // 如果後繼節點等待狀態大於0,說明是CACELLED失效節點      if (s == null || s.waitStatus > 0) {              s = null;              // 同步隊列從尾向頭遍歷,得到一個正常節點              for (Node t = tail; t != null && t != node; t = t.prev)                      if (t.waitStatus <= 0)                              s = t;      }      if (s != null)          // 喚醒後續節點              LockSupport.unpark(s.thread);  }

圖示如下:

線程1釋放鎖

通過圖示並結合源碼,相信大家理解起來就更加清晰了。

注意,線程1釋放同步狀態後,會通知 後繼節點是線程2,不是 Head 頭節點。

上述圖中,同步隊列中的線程2被喚醒後,我們回到 acquireQueued(final Node node, int arg) 這個節點自旋過程的源碼看下。可以在上面找一下這個方法的源碼,其中線程2調用了 parkAndCheckInterrupt() 方法將線程掛起着,如下所示:

private final boolean parkAndCheckInterrupt() {          LockSupport.park(this);          return Thread.interrupted();  }

喚醒之後,繼續執行,調用 Thread.interrupted() 方法檢測下當前線程中斷情況。如果沒有被中斷,則繼續循環,執行如下代碼:

final Node p = node.predecessor();  if (p == head && tryAcquire(arg)) {          setHead(node);          p.next = null; // help GC          failed = false;          return interrupted;  }

node 變量為線程2,調用 p = node.predecessor() 方法獲得前驅節點為頭節點,滿足 p == head 條件,然後調用 tryAcquire(int arg) 嘗試獲得同步狀態,經過上述分析,因為 state 為 0,說明沒有線程加鎖,所以獲得同步狀態成功,該方法返回 true。

調用 setHead(node) 方法,如下所示:

private void setHead(Node node) {          head = node;          node.thread = null;          node.prev = null;  }

將 node 作為頭節點,node 的 prev 前驅節點指針和 thread 線程變量設置為 null。

圖示如下所示:

AQS同步隊列節點喚醒

上述圖中,看到原來的頭節點,已經沒有任何引用了,將來會被 JVM 垃圾回收掉。

剛剛被喚醒的線程2當做了頭節點,但實際也是個空節點了, 因為該節點的 thread 設置為 null了。此時,線程3的節點還在自旋狀態,等線程2釋放鎖後,通知後繼節點,喚醒線程3。都會執行我們上面分析的同一個套路。

最後,經過對上述源碼和圖示的分析,咱們來兩張完整的流程圖,方便大家記憶。

ReentrantLock#lock() 方法獲得鎖流程圖:

ReentrantLock#lock()獲得鎖流程圖

ReentrantLock#unlock() 方法釋放鎖流程圖:

ReentrantLock#unlock()釋放鎖流程圖

5、最後的總結

本文以生活案例場景(醫院窗口取葯流程)介紹為例,聯想到 AQS 到底是什麼,接着介紹對 AQS 設計初衷, 並且以 ReentrantLock 獨佔式鎖為例,深入剖析了 AQS 底層數據結構,以及源碼的實現細節。

AQS 是 Java 並發包中很多同步組件的構建基石,它內部主要是由同步狀態 state 變量和一個 CLH 同步 FIFO 隊列協作來完成的,CLH是一個雙端雙向鏈表數據結構。

當新的線程節點無法獲得同步狀態,將會加入到同步隊列隊尾,此時會採用 CAS 無鎖化來確保該操作的線程安全,保證原子性。線程加入到同步隊列後會被掛起,等待釋放鎖喚醒後繼節點,使得繼續獲得同步狀態。

AQS 採用了模板方法設計模式,根據不同並發包組件同步需求場景,子類同步器只需重寫 tryAcquire(),tryAcquireShared(),tryRelease(),tryReleaseShared() 幾個方法來決定同步狀態的獲取和釋放,tryAcquire() 和 tryRelease() 方法同於獨佔式,tryAcquireShared() 和 tryReleaseShared() 用於共享式。

對於 Java 中很多並發包背後複雜的入隊/出隊,線程阻塞/喚醒,線程安全的保證等,全部都由 AQS 來幫助你完成了,Doug Lea 大神很是牛逼呀!

弄懂了 AQS,大部分並發包里的工具類都是很容易理解了。另外,對於共享式並發包的源碼,大家如果感興趣,可以藉助本文的源碼分析過程,去自行畫圖分析一下。

希望本文給大家能帶來一點點的幫助!能夠抵擋得住面試官的N個連環炮式發問。

歡迎關注我的公眾號,掃二維碼關注獲得更多精彩文章,與你一同成長~
Java愛好者社區