詳解AQS中的condition源碼原理
摘要:condition用於顯式的等待通知,等待過程可以掛起並釋放鎖,喚醒後重新拿到鎖。
本文分享自華為雲社區《AQS中的condition源碼原理詳細分析》,作者:breakDawn。
condition的用法
condition用於顯式的等待通知,等待過程可以掛起並釋放鎖,喚醒後重新拿到鎖。
和直接用lock\unlock去做等待通知的區別在於,lock是不會釋放鎖的,但是利用的condition的await則可以,且喚醒後會自動重新拿回鎖。
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { // if(xxxx)判斷不滿足條件,等待,釋放鎖 condition.await(); } finally { lock.unlock(); } } public void conditionSignal() throws InterruptedException { lock.lock(); try { // 做完事情了,通知condition上等待的開始搶佔 condition.signal(); } finally { lock.unlock(); } }
也提供了一些支援中斷、支援超時的等待方法
condition 和 object.wait/notify的區別
- object的wait依賴sync, 只能最多有一個等待隊列。 而通過newCondition可以製造多個等待隊列
- wait不支援中斷,而condition支援
- condition支援等待特定時間
condition原理分析
超大原理流程圖
- await(), 簡單來講就是把當前執行緒放入condition的等待隊列中,然後調用LockSupport.park拉起執行緒。如果被其他執行緒通過signal喚醒,則放入同步隊列中競爭鎖,競爭成功則返回,否則繼續競爭。
- signal方法,就是拿到condition的等待隊列頭節點,用cas修改節點狀態,改成功則喚醒執行緒。但有可能被別人搶先,所以需要cas操作。
程式碼結構部分:
Lock提供了newCondition介面給外部鎖調用
而newCondition()返回的Condition是一個介面
這個介面的實現類是ConditionObject,放在AQS抽象類的內部類中
原理實現部分
等待隊列
- 每個condition都有一個屬於自己的等待隊列
- 每次調用condition.await, 就插入到等待隊列尾部
- 等待隊列插入封裝執行緒的節點時不需要在尾部CAS, 因為必須先獲取鎖,才能調用await,因此不用CAS競爭
- 每個Lock只有一個同步隊列(用於lock()時阻塞和競爭用), 但是可能會有多個等待隊列(用於condition的await)
等待過程
- 添加執行緒到condition的等待隊列尾部
- 釋放佔用的鎖,並喚醒同步隊列的後繼節點
- 此時肯定不在aqs的同步隊列中了, 用park方法進入阻塞狀態
- 被喚醒,喚醒時可能是通過sign()被人放入了同步隊列, 也可能是被中斷喚醒,因此要做checkInterruptWhileWaiting檢查看是否繼續, 如果同意繼續,就繼續睡眠,直到進入同步隊列
- 嘗試acquireQueued競爭和搶佔state同步狀態
- 退出前,順帶用unlinkCancelledWaiters清理已經不是CONDITION狀態的等待隊列節點
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加本執行緒到等待隊列尾部 Node node = addConditionWaiter(); // 釋放鎖,喚醒同步隊列中的後繼節點 int savedState = fullyRelease(node); int interruptMode = 0; // 如果已經在同步隊列中了,說明被成功sign喚醒 while (!isOnSyncQueue(node)) { // 阻塞掛起 LockSupport.park(this); // 確認是否需要中斷時就退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 在同步隊列中,那就按同步隊列的規則在隊列中用CAS競爭同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清理已經不是CONDITION狀態的等待隊列節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
喚醒過程signal()
1.檢查調用signal時,是否當前執行緒獲取了鎖,不是則拋異常
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
2.獲取condition隊列中的第一個等待節點
Node first = firstWaiter; if (first != null) doSignal(first);
3.用CAS清除CONDITION狀態
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
4.調用AQS的enq(firstWaitNode),將這個節點放入到同步隊列的隊尾(需要CAS支撐?因為可能是共享的,即使獲取了鎖也需要競爭)
Node p = enq(node);
5.移動入同步隊列成功後(可能經歷了幾次CAS),再用unpark方法喚醒,那個執行緒就進入了上面程式碼中Park之後的部分了
int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread);
6.如果是signalAll方法,則等待隊列中每個節點都執行一次signal方法,全部移入同步隊列中並喚醒(喚醒後他們很可能還會因為搶不到資源而阻塞,但隊列位置不同了,也無法再通過sign喚醒了)
do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null);