深入理解Java並發框架AQS系列(五):條件隊列(Condition)
- 2021 年 4 月 28 日
- 筆記
一、前言
AQS中的條件隊列相比較前文中的「獨佔鎖」、「共享鎖」等比較獨立,即便沒有條件隊列也絲毫不影響諸如ReentrantLock
、Semaphore
類的實現,那如此說來條件隊列是否就是一個可有可無的產物?答案是否定的,我們來看下直接或間接用到條件隊列的JDK並發類:
ReentrantLock
獨佔鎖經典類ReentrantReadWriteLock
讀寫鎖ArrayBlockingQueue
基於數組的阻塞隊列CyclicBarrier
循環柵欄,解決線程同步問題DelayQueue
延時隊列LinkedBlockingDeque
雙向阻塞隊列PriorityBlockingQueue
支持優先級的無界阻塞隊列ThreadPoolExecutor
線程池構造器ScheduledThreadPoolExecutor
可基於時間調度的線程池構造器StampedLock
郵戳鎖,1.8後引入,更高效的讀寫鎖
如此豪華的陣容,可見Condition
的地位不可小覷
我們簡單描述下條件隊列實現的功能:有3個線程A、B、C,分別調用wait/await
方法後,線程進入阻塞,在沒有其他線程去喚醒的情況下,3個線程將永遠處於阻塞狀態。此時如果有另外線程調用notify/signal
,那麼A、B、C線程中的某一個將被激活(根據其進入條件隊列的順序而定),從而執行後續的邏輯;如果調用notifyAll/signalAll
的話,那麼3個線程都將被激活,這可能是我們對條件隊列的簡單認識。這樣的描述是否準確呢?可能不太嚴謹,我們引入JDK的條件隊列來做說明
統一話術:其實語法層面支持的wait/notify
與AQS都屬於JDK的範疇,但為了區分兩者,我們定義如下:
- JDK條件隊列:語法層面提供支持的
wait/notify
,即Object
類中的wait()/notify()
方法 - AQS條件隊列:AQS提供的條件隊列,即AQS內部的
ConditionObject
類
二、JDK中的條件隊列(wait/notify)
眾所周知,在JDK中,wait/notify/notifyAll
是根對象Object
中內置的方法,且方法均被定義為native
本地方法
// 等待
public final native void wait(long timeout) throws InterruptedException;
// 喚醒
public final native void notify();
// 喚醒所有等待線程
public final native void notifyAll();
2.1、wait
// 步驟1
synchronized (obj) {
// 步驟2
before();
// 步驟3
obj.wait();
// 步驟4
after();
}
相信大家對上述代碼並不陌生,我們將JDK的條件隊列抽象為4步,逐一闡述
- 步驟1:
synchronized (obj)
- 在jdk中如果想調用
Object.wait()
方法,必須首先獲取該對象的synchronized
鎖,當前步驟,如果成功獲取到鎖,那麼將進入「步驟2」,如果存在並發,當前線程將會進入阻塞(線程狀態為BLOCKED),知道獲取到鎖為止
- 在jdk中如果想調用
- 步驟2:
before()
- 我們知道
synchronized
是獨佔鎖,所以在執行步驟2代碼時,程序是不存在並發的,即同一時刻,只有一個線程正在執行,此處也相對好理解
- 我們知道
- 步驟3:
obj.wait()
- 此步驟是將當前線程放入條件隊列,同時釋放
obj
的同步鎖。此處跟我們對synchronized
的認知有悖,我們一般認為synchronized (obj) {......}
在大括號中的代碼會一直持有鎖,而事實情況卻是,當程序執行wait()
方法時,會釋放obj
的同步鎖
- 此步驟是將當前線程放入條件隊列,同時釋放
- 步驟4:
after()
- 此步驟是並發執行還是串行執行?假設我們現在有3個線程A、B、C都已經執行完畢
wait()
方法,並進入了條件隊列,等待其他線程喚醒;此時另外一個線程執行了notifyAll()
時,後續的激活流程是怎麼樣的?- 錯誤觀點:有很多同學直觀感受是,線程A、B、C同時被激活,所以步驟4是並發執行的;就像是百米賽跑,所有同學都準備就緒(wait),一聲槍響後(notifyAll),所有人開始賽跑,並跑到終點(步驟4)
- 正確觀點:其實「步驟4」是串行執行的,大家再檢查下代碼後便可發現,「步驟4」處於
synchronized
的大括號之間;還是拿上述賽跑舉例,如果認為從聽到槍響至跑到終點是「步驟4」的話,那真實的場景應該是這樣的:一聲槍響後,A起跑,B、C原地不動;A跑到終點後,B開始起跑,C原地不動;最後是C跑到終點
- 此步驟是並發執行還是串行執行?假設我們現在有3個線程A、B、C都已經執行完畢
由此我們斷定,obj.wait()
雖然是native方法,但其內部經歷了釋放鎖、重新搶鎖的兩個大環節
2.2、notify
synchronized (obj) {
obj.notify();
// obj.notifyAll();
}
所有因obj.wait()
阻塞的線程,都要通過notify
來喚醒
notify()
喚醒條件隊列中,隊首節點notifyAll()
喚醒條件隊列中所有節點
三、AQS中的條件隊列(await/signal)
我們初看AQS中的條件隊列時,發現其提供了與JDK條件隊列幾乎一致的功能
JDK | AQS |
---|---|
wait |
await |
notify |
singal |
notifyAll |
singalAll |
用法上也及其相似:
await
:
// 初始化
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
singal
:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.signal();
} finally {
lock.unlock();
}
3.1、條件隊列
我們知道在AQS內部維護了一個阻塞隊列,數據結構如下:
上圖描述的是一個長度為 3 的FIFO阻塞隊列,因為頭結點常駐內存,所以不算在內;我們可以發現阻塞隊列中每個節點都包含了前、後引用
那AQS內部的另一個條件隊列又是什麼樣的數據結構呢?
可見,條件隊列為單向列表,只有指向下一個節點的引用;沒有被喚醒的節點全部存儲在條件隊列上。上圖描述的是一個長度為 5 的條件隊列,即有5個線程執行了await()
方法;與阻塞隊列不同,條件隊列沒有常駐內存的「head結點」,且一個處於正常狀態節點的waitStatus
為 -2 。當有新節點加入時,將會追加至隊列尾部
3.2、喚醒
當我們調用signal()
方法時,會發生什麼?我們還是拿長度為 5 的條件隊列舉例說明,在AQS內部會經歷隊列轉移,即由條件隊列轉移至阻塞隊列
而signalAll()
執行時,具體執行流程與signal()
類似,即會將條件隊列中的所有節點全部轉移至阻塞隊列(並發度為1,按順序依次激活)中,依靠阻塞隊列自身依次喚醒的機制,達到激活所有線程的目的
四、JDK vs AQS
經過上文的介紹,似乎AQS做了與wait/notify
相同的功能,相比較而言,甚至JDK的寫法更簡潔;那他們在性能上的表現如何呢?讓我們來做個對比
4.1、對比
我們模擬這樣的一個場景:啟動10個線程,分別調用wait()
方法,當所有線程都進入阻塞後,調用notifyAll()
,10個線程均被喚醒並執行完畢後,方法結束。 上述方法執行10000次,對比JDK與AQS耗時
JDK測試代碼:
public class ConditionCompareTest {
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
jdkTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗時: " + cost);
}
public void jdkTest() throws InterruptedException {
Object lock = new Object();
List<Thread> list = Lists.newArrayList();
// 步驟一:啟動10個線程,並進入wait等待
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步驟二:等待10個線程全部進入wait方法
while (true) {
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步驟三:喚醒10個線程
synchronized (lock) {
lock.notifyAll();
}
// 步驟四:等待10個線程全部執行完畢
for (Thread thread : list) {
thread.join();
}
}
}
AQS測試代碼:
public class ConditionCompareTest {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
aqsTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗時: " + cost);
}
@Test
public void aqsTest() throws InterruptedException {
AtomicInteger lockedNum = new AtomicInteger();
List<Thread> list = Lists.newArrayList();
// 步驟一:啟動10個線程,並進入wait等待
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
lock.lock();
lockedNum.incrementAndGet();
condition.await();
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步驟二:等待10個線程全部進入wait方法
while (true) {
if (lockedNum.get() != 10) {
continue;
}
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步驟三:喚醒10個線程
lock.lock();
condition.signalAll();
lock.unlock();
// 步驟四:等待10個線程全部執行完畢
for (Thread thread : list) {
thread.join();
}
}
}
條件隊列 | 耗時1 | 耗時2 | 耗時3 | 耗時4 | 耗時5 | 平均耗時(ms) |
---|---|---|---|---|---|---|
JDK |
5000 | 5076 | 5054 | 5089 | 4942 | 5032 |
AQS |
5358 | 5440 | 5444 | 5473 | 5472 | 5437 |
4.2、基準測試Q&A
基於以上的測試我們還是有一些疑問的,不要小看這些疑問,通過這些疑問我們可以把之前的知識點全都串聯起來
- Q:AQS測試中的「步驟二」,為什麼在判斷「等待10個線程全部進入wait方法」時,要引入
lockedNum.get() != 10
的判斷?直接通過判斷所有線程是否均為waiting
方法不可以嗎? - A:如果真的刪除
lockedNum.get() != 10
的判斷,在多次並發測試時,會有較小的概率出現程序死鎖的情況(作者電腦的環境是平均5萬次調用會出現一次),為什麼會出現死鎖呢?我們追AQS源碼就會發現,不管是調用lock()
還是await
,掛起線程使用的方法均為LockSupport.park()
方法,此方法會將線程置為WAITING
狀態,也就是線程狀態是WAITING
狀態時,有可能線程剛進入lock()
方法,從而導致await
與thread.join()
的死鎖 - Q:既然是這樣,為什麼JDK的測試沒有出現死鎖?
- A:我們看到JDK的加鎖是通過
synchronized
關鍵字完成的,而當線程因為等待synchronized
資源而阻塞時,線程狀態將變為BLOCKED
,而進入wait()
方法後,狀態才會變為WAITING
- Q:那看來只有通過引入
AtomicInteger lockedNum
變量才能解決死鎖問題了 - A:其實解決問題的方式有很多種,我們甚至可以簡單將
ReentrantLock lock
置為公平鎖,也能解決上述死鎖問題;因為當前場景發生死鎖的情況是,singalAll()
先於await()
發生,而當所有線程都變成WAITING
狀態後,公平鎖則確保了singalAll()
一定是在所有線程都調用了await()
。但因為synchronized
本身是非公平鎖,故如果AQS使用公平鎖的話,性能偏差較大 - Q:那這樣看來,AQS中的阻塞隊列相對比JDK的沒有優勢可言啊,用法上沒有JDK簡潔,性能上還沒人家快
- A:的確,如果真是只是單純的使用阻塞、喚醒功能的話,還是建議使用JDK內置的方式;但AQS的優勢並不在此
五、再說AQS條件隊列
AQS的優勢在於,其提供了豐富的api可以查詢條件隊列的狀態;例如當我們想看一下在條件隊列中等待節點的個數時,使用JDK的wait/notify
時,是無法做的;AQS提供的api如下:
boolean hasWaiters()
阻塞隊列中是否有等待節點int getWaitQueueLength()
獲取阻塞隊列長度Collection<Thread> getWaitingThreads()
獲取阻塞隊列中線程對象
這些api為程序提供了更靈活的控制,條件隊列對於javaer已不是黑盒;當然使用AQS的條件隊列必然要引入獨佔鎖,例如ReentrantLock
,自然地我們還可以通過它查看條件隊列外圍的一些指標,例如:
Interrupted
響應中斷,藉助獨佔鎖,提供響應中斷能力;wait/notify
不提供,因為雖然wait
方法響應中斷,但是synchronized
關鍵字是會一直阻塞的boolean tryLock()
嘗試獲取鎖;wait/notify
不提供int getHoldCount()
獲取阻塞線程的數量boolean isLocked()
是否持有鎖fair/nonFair
提供公平/非公平鎖...
可見整個AQS體系相比較Object
的wait/notify
方法是相當靈活的,提供了很多監控條件隊列、阻塞隊列的指標
六、致謝
這裡要特別感謝一下神策數據的架構師金滿倉,同時也是我私下的摯友。他功力深厚,對程序有着自己獨到的見地,在整個AQS編寫期間,不厭其煩地給我提供了很多理論及數據上的支持,幫我拓寬視野,再次感謝!