詳解AQS中的condition源碼原理

摘要:condition用於顯式的等待通知,等待過程可以掛起並釋放鎖,喚醒後重新拿到鎖。

本文分享自華為雲社區《AQS中的condition源碼原理詳細分析》,作者:breakDawn。

condition的用法

condition用於顯式的等待通知,等待過程可以掛起並釋放鎖,喚醒後重新拿到鎖。

和直接用lock\unlock去做等待通知的區別在於,lock是不會釋放鎖的,但是利用的condition的await則可以,且喚醒後會自動重新拿回鎖。

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
 lock.lock();
 try {
 // if(xxxx)判斷不滿足條件,等待,釋放鎖
 condition.await();
 } finally {
 lock.unlock();
 }
}
public void conditionSignal() throws InterruptedException {
 lock.lock();
 try {
 // 做完事情了,通知condition上等待的開始搶佔
 condition.signal();
 } finally {
 lock.unlock();
 }
}

也提供了一些支援中斷、支援超時的等待方法

condition 和 object.wait/notify的區別

  1. object的wait依賴sync, 只能最多有一個等待隊列。 而通過newCondition可以製造多個等待隊列
  2. wait不支援中斷,而condition支援
  3. condition支援等待特定時間

condition原理分析

超大原理流程圖

  • await(), 簡單來講就是把當前執行緒放入condition的等待隊列中,然後調用LockSupport.park拉起執行緒。如果被其他執行緒通過signal喚醒,則放入同步隊列中競爭鎖,競爭成功則返回,否則繼續競爭。
  • signal方法,就是拿到condition的等待隊列頭節點,用cas修改節點狀態,改成功則喚醒執行緒。但有可能被別人搶先,所以需要cas操作。

程式碼結構部分:

​ Lock提供了newCondition介面給外部鎖調用

​ 而newCondition()返回的Condition是一個介面

​ 這個介面的實現類是ConditionObject,放在AQS抽象類的內部類中

原理實現部分

等待隊列

  • 每個condition都有一個屬於自己的等待隊列
  • 每次調用condition.await, 就插入到等待隊列尾部
  • 等待隊列插入封裝執行緒的節點時不需要在尾部CAS, 因為必須先獲取鎖,才能調用await,因此不用CAS競爭
  • 每個Lock只有一個同步隊列(用於lock()時阻塞和競爭用), 但是可能會有多個等待隊列(用於condition的await)

等待過程

  1. 添加執行緒到condition的等待隊列尾部
  2. 釋放佔用的鎖,並喚醒同步隊列的後繼節點
  3. 此時肯定不在aqs的同步隊列中了, 用park方法進入阻塞狀態
  4. 被喚醒,喚醒時可能是通過sign()被人放入了同步隊列, 也可能是被中斷喚醒,因此要做checkInterruptWhileWaiting檢查看是否繼續, 如果同意繼續,就繼續睡眠,直到進入同步隊列
  5. 嘗試acquireQueued競爭和搶佔state同步狀態
  6. 退出前,順帶用unlinkCancelledWaiters清理已經不是CONDITION狀態的等待隊列節點
public final void await() throws InterruptedException {
 if (Thread.interrupted())
 throw new InterruptedException();
 // 添加本執行緒到等待隊列尾部
 Node node = addConditionWaiter();
 // 釋放鎖,喚醒同步隊列中的後繼節點
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 // 如果已經在同步隊列中了,說明被成功sign喚醒
 while (!isOnSyncQueue(node)) {
 // 阻塞掛起
 LockSupport.park(this);
 // 確認是否需要中斷時就退出
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 break;
 }
 // 在同步隊列中,那就按同步隊列的規則在隊列中用CAS競爭同步狀態
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 interruptMode = REINTERRUPT;
 // 清理已經不是CONDITION狀態的等待隊列節點
 if (node.nextWaiter != null) 
 unlinkCancelledWaiters();
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
}

喚醒過程signal()

1.檢查調用signal時,是否當前執行緒獲取了鎖,不是則拋異常

if (!isHeldExclusively())
 throw new IllegalMonitorStateException();

2.獲取condition隊列中的第一個等待節點

Node first = firstWaiter;
if (first != null)
 doSignal(first);

3.用CAS清除CONDITION狀態

if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
 return false;

4.調用AQS的enq(firstWaitNode),將這個節點放入到同步隊列的隊尾(需要CAS支撐?因為可能是共享的,即使獲取了鎖也需要競爭)

Node p = enq(node);

5.移動入同步隊列成功後(可能經歷了幾次CAS),再用unpark方法喚醒,那個執行緒就進入了上面程式碼中Park之後的部分了

int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
 LockSupport.unpark(node.thread);

6.如果是signalAll方法,則等待隊列中每個節點都執行一次signal方法,全部移入同步隊列中並喚醒(喚醒後他們很可能還會因為搶不到資源而阻塞,但隊列位置不同了,也無法再通過sign喚醒了)

do {
 Node next = first.nextWaiter;
 first.nextWaiter = null;
 transferForSignal(first);
    first = next;
} while (first != null);

 

點擊關注,第一時間了解華為雲新鮮技術~