AQS源碼二探-JUC系列
本文已在公眾號上發布,感謝關注,期待和你交流。
AQS源碼二探-JUC系列
共享模式
doAcquireShared
這個方法是共享模式下獲取資源失敗,執行入隊和等待操作,等待的執行緒在被喚醒後也在這個方法中自旋執行,直到拿到資源。
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 新節點入隊,SHARED模式
final Node node = addWaiter(Node.SHARED);
// 標識是否獲取失敗
boolean failed = true;
try {
// 標識是否執行緒是否中斷
boolean interrupted = false;
for (;;) {
// 獲得當前節點的前節點
final Node p = node.predecessor();
// 前節點是頭節點
if (p == head) {
// 嘗試獲取資源【1】
int r = tryAcquireShared(arg);
// 返回值大於等於0算獲取成功
if (r >= 0) {
// 設置頭節點【2】
setHeadAndPropagate(node, r);
// 斷開前節點next引用
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消嘗試獲取鎖的節點
cancelAcquire(node);
}
}
-
【1】,這個入隊等待方法和獨佔模式下的實現邏輯基本是一致的,共享模式使用
tryAcquireShared
獲取資源方法返回值使用int來表示成功或失敗的。獲取成功,就會進行設置head操作,把原來的head移出隊列 -
【2】,獨佔模式的程式碼這裡獲取資源成功後是調用
setHead
,共享模式這裡是調用setHeadAndPropagate
方法,setHeadAndPropagate
方法執行了setHead後還調用了doReleaseShared()
觸發了嘗試喚醒節點的操作。在釋放資源的時候觸發喚醒節點的操作很好理解,而這裡是頭節點後繼節點獲取資源成功後為什麼需要觸發喚醒節點呢?對這點特別展開分析一下:
- release喚醒操作都是從head往後找節點,並且只會找到一個,並沒有一次找出多個去喚醒的能力
- 在獨佔模式時的release操作只需要判斷head不為null並且head的waitStatus不是取消狀態就會去喚醒後繼節點,因為獨佔模式下只有一個執行緒能獲取到資源,隊列里等待的節點也只有head後的最近一個有效節點需要喚醒,所以這個被喚醒的節點沒有必要再去判斷是否需要喚醒自己的後繼節點,只需要依靠自己的release就可以了
- 共享模式下多個執行緒同時可以獲取資源成功,也就意味著同時多個執行緒釋放資源,那麼仍然是依賴head往後找有一個有效節點喚醒必然不滿足要求了,假設已經有多個執行緒在在隊列等待資源釋放,此時瞬間釋放了兩個資源,釋放資源的時候會去喚醒head後繼節點,這兩個釋放執行緒拿到的head是同一個,所以這時候的喚醒就不能做到喚醒head後兩個有效節點,而期望的是喚醒兩個,這一點是理解獨佔和共享模式之間在喚醒操作上的不同處理方式的關鍵。
理解以上幾點後,再思考下只需要補充head節點發生變化時觸發喚醒後繼節點就可以滿足要求了,如果隊列中等待的節點依次被喚醒成為head然後繼續往後喚醒節點,形式上看起來像一個傳播(propagate)的動作。
setHeadAndPropagate
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//設置頭節點【1】
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 判斷是否需要進行喚醒操作【2】
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
-
【1】如前方法名所示,先設置head然後進行傳播(doReleaseShared)
-
【2】第一個if判斷分解如下:
- 1,
propagate > 0
,這個判讀為true表示還有資源可以獲取,直接進入if - 2,
h == null || h.waitStatus < 0
,這個h是設置head前存好的老的head,這個先判斷了老head的waitStatus如果是小於0的就進入if,先判空是防空指針 - 3,
(h = head) == null || h.waitStatus < 0
,前面兩個都不成立,最後會判斷新的head的waitStatus如果是小於0的就進入if
裡面第二個if判斷只需確定後繼節點是共享模式,就調用doReleaseShared方法。
- 1,
releaseShared
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
@ReservedStackAccess
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared執行成功就調用doReleaseShared即可完成釋放資源,重點看doReleaseShared的程式碼。
doReleaseShared
這裡已經知道調用doReleaseShared的地方有兩處,一處是釋放資源的時候(releaseShared),一處是等待隊列頭節點的後繼節點獲得資源設置新的head後調用(setHeadAndPropagate)
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 循環內執行【1】
for (;;) {
Node h = head;
// 【2】
if (h != null && h != tail) {
int ws = h.waitStatus;
// 頭節點waitStatus為SIGNAL【3】
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 頭節點waitStatus為0【4】
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 出循環唯一判斷就是head節點沒有變化
if (h == head) // loop if head changed
break;
}
}
- 【1】這個方法的功能就是來喚醒head的後繼節點的,這裡卻來了個死循環,退出條件是執行到最後head沒有變化,也就是說在執行的時候無論是符合條件觸發了unparkSuccessor還是沒有觸發,只要最後判斷head變化了,那麼還得繼續循環執行喚醒head後繼節點的邏輯,也就是說並不是一個執行緒進入這個方法邏輯完成喚醒就結束了,只要head不斷在變化,那麼可能會有很多執行緒同時在執行這段邏輯而不會退出循環,直到
tryAcquireShared
返回負數,可以想像很多執行緒同時在執行這個循環程式碼,如果也剛好tryAcquireShared
能成功的情況,這樣是能夠讓那些等待的節點更快的被喚醒並且獲得資源。 - 【2】
h != null && h != tail
這個判斷都會認為是能夠判斷此時等待隊列是否有等待被喚醒節點,所以進入後面的邏輯。其實這裡還有一種場景就是初始化頭節點執行了cas設置head,tail還是為空狀態,在前篇中重點分析過。那麼在這裡嘗試喚醒head後繼節點的邏輯里是可以忽略這個場景的。 - 【3】如果head的waitStatus為SIGNAL,那麼意味著有資格喚醒後繼節點,不過需要先cas修改waitStatus到0成功才行,如果修改失敗,表示已經有執行緒搶先一步做了同樣的事情,所以沒有必要去下面判斷head是否變化了,直接繼續執行循環。
- 【4】如果head的waitStatus為0,cas修改為PROPAGATE,如果修改失敗,說明已經有執行緒搶先一步做了同樣的事情,和前面一樣直接繼續循環執行。關於PROPAGATE這個狀態後面詳細分析。
中斷模式
在AQS的注釋和API中發現有根據是否會拋出InterruptedException,分成了 uninterruptible mode 和 interruptible mode。比如acquire(int arg)
和acquireInterruptibly(int arg)
,acquireShared(int arg)
和acquireSharedInterruptibly(int arg)
本質區別就是在是否執行過程中對中斷的響應。
以acquireSharedInterruptibly為例
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
進入方法就先判斷執行緒中斷標誌,看起來是非常關心執行程式碼的執行緒是不是中斷了,哈哈。
doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();【1】
}
} finally {
if (failed)
cancelAcquire(node);【3】
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();【2】返回執行緒中斷標誌
}
-
【1】【2】【3】,執行緒等待的地方是在parkAndCheckInterrupt的
LockSupport.park(this);
這行程式碼,喚醒時繼續從這行程式碼下開始執行。這裡涉及到一個比較關鍵的資訊就是LockSupport.park(this);
裡面是調用UNSAFE.park
方法,這個方法是native方法,因為這裡沒有等待時間的設置,只有兩種可能會喚醒這個執行緒:1,調用了unPark方法;2,執行緒中斷。執行緒醒來的時候並不能確認是unPark喚醒的還是中斷喚醒的。所以無論如何都會在喚醒執行緒後第一步【2】中就是得到中斷標誌,如果執行緒已中斷parkAndCheckInterrupt返回true,就會拋出InterruptedException
異常。拋出異常就會退出自旋,也會執行到
cancelAcquire(node)
。關於這個方法前篇已經詳細介紹,它會把傳入的node的waitstatus設置為CANCELLED狀態。
而對於uninterruptible mode的實現就是在發現喚醒執行緒中斷標誌是true後,就執行Thread.currentThread().interrupt();
再次設置中斷,然後並不處理中斷。使用者可以在外面檢查執行緒中斷狀態進行處理。
超時機制
除了中斷以外,AQS還提供了有超時機制的API,這個能力的基礎是基於Unsafe的park(boolean isAbsolute, long time)方法。
比如AQS中doAcquireSharedNanos方法:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)// 參數判斷
return false;
// 計算到期時間【1】
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 計算剩餘時間 【2】
nanosTimeout = deadline - System.nanoTime();
// 如果已經超時就返回【3】
if (nanosTimeout <= 0L)
return false;
// 對剩餘時間有限制【4】
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 處理中斷【5】
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 【1】注意,在進入for循環之前就計算出過期時間,這個過期時間是不會變的
- 【2】,【3】,【4】在原來的自旋邏輯里加入了自旋計算超時剩餘時間,每次循環是先使用deadline計算出剩餘超時時間,已經超時就返回,這個for循環可能會執行多次,在執行過程中時間就會消耗,有可能還未進入park操作,就已經超時了。而在執行park之前的條件是等待時間必須超過
spinForTimeoutThreshold
(1000微秒),如果連1000微秒都不到了,就沒必要把執行緒弄等待了,直接再執行一下循環程式碼消耗掉時間,從【3】判斷退出循環 - 【5】AQS中
doAcquireSharedNanos
和doAcquireNanos
方法都會在處理中斷
共享模式同步器
下面介紹下使用共享模式擴展的兩個同步器:Semaphore和CountDownLatch。
Semaphore
Semaphore經常用於限制訪問資源的執行緒數量。
以一個停車場的運作為例。簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛直接進入,然後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入外面的一輛進去,如果又離開兩輛,則又可以放入兩輛,如此往複。
在這個停車場系統中,車位是公共資源,每輛車好比一個執行緒,看門人起的就是訊號量的作用。
— 百度百科
和ReentrantLock實現相似,它內部也有一個Sync類,然後子類NonfairSync和FairSync分別實現非公平和公平模式。
NonfairSync.tryAcquireShared
Semaphore自定實現tryAcquireShared方法。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋【1】
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) //cas【2】
return remaining;
}
}
- 【1】,【2】nonfairTryAcquireShared實現是一個循環+cas操作,它們操作的是AQS中的state欄位,先獲取當前state的值,然後使用請求的數量計算剩餘的數量,如果計算的剩餘是小於0,直接返回這個負數值,表示獲取失敗;如果是大於等於0,表示有機會獲取成功,那麼就cas嘗試更新state值,這裡沒有考慮ABA問題,因為出現ABA情況對於此時的邏輯也是正確的。當然,cas成功那就是獲取成功,如果cas失敗,沒事,循環繼續執行。方法只有兩種返回可能,一種cas成功,返回大於等於0的值;一種計算出剩餘小於0,返回小於0的值。
FairSync.tryAcquireShared
對於公平場景,Semaphore實現和ReentrantLock也是一致:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
核心就是hasQueuedPredecessors方法,在前篇中已經詳細分析,其他程式碼和非公平模式程式碼一致。
Sync.tryReleaseShared
無論公平還是非公平,釋放資源操作都是執行AQS的releaseShared方法,從而執行到tryReleaseShared方法。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
和前面的獲取操作一樣,釋放操作需要修改state值也是使用循環+cas結合使用,對int值overflow的情況做了拋出異常處理。
Semaphore例子
以下是例子程式碼,Semaphore的初始化值是2,一共5個執行緒。首先兩個執行緒acquire成功,另三個個執行緒進入隊列等待,然後5秒後,兩個執行緒分別執行release,等待隊列中的執行緒被喚醒,隊列中前兩個執行緒獲取成功,最後一個執行緒繼續等待直到再有執行緒執行release。
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.submit(new Task());
}
}
static class Task implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+" semaphore release");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
以下使用圖示例進行說明:
1,三個執行緒進入等待隊列等待,state值被前兩個執行緒獲取成功後從2變為了0,注意因為head指向的節點是作為虛節點所以實際等待隊列中此時為4個節點,最後一個節點因為沒有後繼節點所以它的waitstatus為0,其他節點的waitstatus都為SIGNAL。
2,5秒後,前面獲取成功的兩個執行緒先後執行release,會執行到doReleaseShared
方法喚醒head的後繼節點,我們假設一種並發場景:在doReleaseShared
方法程式碼中第一個執行緒執行了compareAndSetWaitStatus(h, Node.SIGNAL, 0)
成功,並且喚醒一個後繼節點執行緒T3,此時head的狀態是0,就符合了下面的判斷,所以第二個執行緒會執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
把head的狀態更新為PROPAGATE,然後假設被喚醒的執行緒還沒有更新head指向,那麼第二個執行緒在判斷h == head
為true後退出。被喚醒的執行緒就繼續執行doAcquireShared
的自旋程式碼,因為資源已經釋放,它能獲取成功,然後執行setHeadAndPropagate
方法設置head和繼續喚醒後繼節點,因為這個場景里老head狀態是PROPAGATE,新head狀態是SIGNAL,隨便哪一個都符合喚醒後繼節點的要求。喚醒T4後,也會獲取成功,執行setHeadAndPropagate
,新head狀態是SIGNAL所以會執行doReleaseShared
,並且符合喚醒後繼節點條件,就會喚醒T5,注意這裡就2個資源,但是會多喚醒一個執行緒,T5被喚醒後執行doAcquireShared
的自旋程式碼,但是資源不足獲取失敗,然後修改head節點的狀態為SIGNAL,執行緒繼續等待。
允許一個或多個執行緒等待一組操作在其他執行緒中執行完成。注釋中翻譯一下它作用,可以作為一個閂鎖,所有調用await的執行緒都會在鎖前面等待,直到它調用countDown;或者初始化為N的CountDownLatch可以用來讓一個執行緒等待,直到N執行緒完成某個動作,或者某個動作已經完成N次。
要實現CountDownLatch的功能,使用AQS的state計數,用同步隊列阻塞執行緒,實現應該也簡單。
源碼實現沒有公平和非公平,內部也實現了一個繼承AbstractQueuedSynchronizer的子類Sync。
Sync.tryAcquireShared
這個tryAcquireShared和前面同步器實現都不同,入參acquires不需要使用,也不會更改state值,而是判斷當state為0的時候,來獲取的操作能成功,否則獲取失敗。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
Sync.tryReleaseShared
自定義的釋放操作確保state值只能減到0,不會減到負數,仍然是循環+cas操作實現。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await和countDown
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
await是獲取,countDown是釋放。在初始化CountDownLatch的時候需要傳入一個countDown數值,表示執行緒通過await之前必須調用countDown的次數,初始化state為countDown,每次執行countDown後state值就減1。
從前面的實現程式碼可以看出CountDownLatch的初始化後,在state沒有變成0之前執行await就會進入到同步隊列等待,每次執行countDown,state就會減1,state變成0的時候觸發喚醒等待執行緒。
CountDownLatch類注釋上寫了它兩個典型用法:
- 1,某一個執行緒開始運行前需要等待一個或多個執行緒都完成。一個執行緒先執行await等待,然後根據初始化的值,執行這個數量的countDown次數,可以一個執行緒執行,也可以多個執行緒執行,state被減到小於等於0的時候觸發喚醒前面等待的一個執行緒。
- 2,可以實現多個執行緒並行執行的場景,多個執行緒開始執行前都在await上等待,然後用一個countDown方法就可以幾乎同時喚醒多個等待的執行緒。
關於PROPAGATE
在共享模式中waitStatus的PROPAGATE狀態顯得有點突兀,然而在網上一查,原來是有一個bug的故事,仔細查看這個bug的場景以及修復程式碼。我覺得非常有助於理解共享模式下釋放操作喚醒head後繼節點和被喚醒執行緒獲取成功後喚醒後繼節點時產生的競爭場景。
bug描述:鏈接
修復程式碼記錄:鏈接
在bug描述中有復現測試程式碼,也有詳細的描述,可以兩者結合查看幫助理解。
Doug Lea說:
There was a race condition that could prevent a releaseShared from being propagated through to a parked thread, leaving the thread parked while a permit existed.
這裡稍作解釋:
查看原來程式碼,對於release是否進行喚醒後繼節點的判斷:(h != null && h.waitStatus != 0
,對於setHeadAndPropagate是否進行喚醒後繼節點的判斷:propagate > 0 && node.waitStatus != 0
。
當一個release發生,被喚醒的節點獲取到最後一個資源(propagate=0),此時head的狀態是0,再有一個release發生,條件不滿足喚醒後繼節點退出,前面被喚醒的執行緒執行setHeadAndPropagate,也不會去喚醒後繼節點,而此時資源是有的,但是不能觸發喚醒後繼節點了。
所以引入了PROPAGATE狀態,這個狀態是在等待隊列中有等待節點的時候判斷到head節點是0的情況,意味著一定是head的後繼節點已經被喚醒在執行,就把head的節點改為PROPAGATE狀態,如果是最後一個資源(propagate=0),但是也是判斷h.waitStatus < 0依舊可以觸發喚醒。在unparkSuccessor方法中只要狀態小於0的都會重置成0,PROPAGATE也不影響其他流程。
總結
1,共享模式和獨佔模式在實現上的的區別是共享模式需要處理並發釋放和並發喚醒的場景,而獨佔模式只需要處理單個喚醒和單個喚醒的場景。所以在共享模式下獲取成功也會觸發後繼節點的喚醒。
2,AQS支援中斷模式和非中斷模式,另外超時的API也是會處理中斷的,而對於中斷的場景會使節點的狀態改為取消狀態,而不是當時就進行刪除,這些取消狀態的節點在喚醒的時候就會從隊列中切斷連接剔除出去。
3,上面介紹了基於AQS共享模式實現的同步器Semaphore和CountDownLatch,在AQS內部機制基礎上實現這兩個同步器非常簡單了,這也是AQS的厲害之處吧。
4,關於AQS源碼這是第二篇,後續預計還有2篇文章,下一篇主要介紹condition隊列的實現。