java高並發編程基礎之AQS
引言
曾經有一道比較比較經典的面試題「你能夠說說java
的並發包下面有哪些常見的類?」大多數人應該都可以說出
CountDownLatch、CyclicBarrier、Sempahore多執行緒並發三大利器。這三大利器都是通過AbstractQueuedSynchronizer
抽象類(下面簡寫AQS)來實現的,所以學習三大利器之前我們有必要先來學習下AQS
。
AQS是一種提供了原子式管理同步狀態、阻塞和喚醒執行緒功能以及隊列模型的簡單框架
AQS結構
說到同步我們如何來保證同步?大家第一印象肯定是加鎖了,說到鎖的話大家肯定首先會想到的是Synchronized。
Synchronized大家應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫我們實現的,我們只需要簡單的加個 Synchronized關鍵字就可以了。
用起來超級方便。但是有沒有一種情況我們設置一個鎖的超時時間Synchronized就有點實現不了,這時候我們就可以用ReentrantLock來實現,ReentrantLock是通過aqs來實現的,今天我們就通過ReentrantLock來學習一下aqs。
CAS && 公平鎖和非公平鎖
AQS裡面用到了大量的CAS學習AQS之前我們還是有必要簡單的先了解下CAS
、公平鎖和非公平鎖。
CAS
- CAS 全稱是 compare and swap,是一種用於在多執行緒環境下實現同步功能的機制。
CAS
操作包含三個操作數 — 記憶體位置、預期數值和新值。CAS
的實現邏輯是將記憶體位置處的數值與預期數值想比較,若相等,則將記憶體位置處的值替換為新值。若不相等,則不做任何操作,這個操作是個原子性操作,java裡面的AtomicInteger
等類都是通過cas來實現的。
公平鎖和非公平鎖
- 公平鎖:多個執行緒按照申請鎖的順序去獲得鎖,執行緒會直接進入隊列去排隊,隊列中第一個才能獲得到鎖。
優點:等待鎖的執行緒不會餓死,每個執行緒都可以獲取到鎖。
缺點:整體吞吐效率相對非公平鎖要低,等待隊列中除第一個執行緒以外的所有執行緒都會阻塞,CPU喚醒阻塞執行緒的開銷比非公平鎖大。 - 非公平鎖:多個執行緒去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,如果能獲取到,就直接獲取到鎖。
優點:可以減少CPU喚醒執行緒的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有執行緒,會減少喚起執行緒的數量。
缺點:處於等待隊列中的執行緒可能會餓死,或者等很久才會獲得鎖。
文字有點拗口,我們來個實際的例子說明下。比如我們去食堂就餐的時候都要排隊,大家都按照先來後到的順序排隊打飯,這就是公平鎖。如果等到你準備拿盤子打飯的時候
直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你沒法忍了,直接大吼一聲讓他滾,這個
小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。
我們先來看看AQS有哪些屬性
// 頭結點
private transient volatile Node head;
// 阻塞的尾節點,每個新的節點進來,都插入到最後,也就形成了一個鏈表
private transient volatile Node tail;
// 這個是最重要的,代表當前鎖的狀態,0代表沒有被佔用,大於 0 代表有執行緒持有當前鎖
// 這個值可以大於 1,是因為鎖可以重入,每次重入都加上 1
private volatile int state;
// 代表當前持有獨佔鎖的執行緒,舉個最重要的使用例子,因為鎖可以重入
// reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前執行緒是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
下面我們來寫一個demo分析下lock 加鎖和釋放鎖的過程
final void lock() {
// 上來先試試直接把狀態置位1,如果此時沒人獲取鎖就直接
if (compareAndSetState(0, 1))
// 爭搶成功則修改獲得鎖狀態的執行緒
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
cas
嘗試失敗,說明已經有人再持有鎖,所以進入acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
方法,看名字大概能猜出什麼意思,就是試一試。
tryAcquire實際上是調用了父類Sync的nonfairTryAcquire
方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取下當前鎖的狀態
int c = getState();
// 這個if 邏輯跟前面一進來就獲取鎖的邏輯一樣都是通過cas嘗試獲取下鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 進入這個判斷說明 鎖重入了 狀態需要進行+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 如果鎖的重入次數大於int的最大值,直接就拋出異常了,正常情況應該不存在這種情況,不過jdk還是嚴謹的
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了
return false;
}
tryAcquire
方法如果獲取鎖失敗了,那麼肯定就要排隊等待獲取鎖。排隊的執行緒需要待在哪裡等待獲取鎖?這個就跟我們執行緒池執行任務一樣,執行緒池把任務都封裝成一個work,然後當執行緒處理任務不過來的時候,就把任務放到隊列裡面。AQS同樣也是類似的,把排隊等待獲取鎖的執行緒封裝成一個NODE。然後再把NODE放入到一個隊列裡面。隊列如下所示,不過需要注意一點head是不存NODE的。
接下來我們繼續分析源碼,看下獲取鎖失敗是如何被加入隊列的。
就要執行acquireQueued方法,執行acquireQueued方法之前需要先執行addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// cas 加入隊列隊尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾結點不為空 || cas 加入尾結點失敗
enq(node);
return node;
}
enq
接下來再看看enq方法
// 通過自旋和CAS一定要當前node加入隊尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾結點為空說明隊列還是空的,還沒有被初始化,所以初始化頭結點,可以看到頭結點的node 是沒有綁定執行緒的也就是不存數據的
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通過addWaiter方法已經把獲取鎖的執行緒通過封裝成一個NODE加入對列。上述方法的一個執行流程圖如下:
,接下來就是繼續執行acquireQueued方法
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 通過自旋去獲取鎖 前驅節點==head的時候去嘗試獲取鎖,這個方法在前面已經分析過了。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 進入這個if說明node的前驅節點不等於head 或者嘗試獲取鎖失敗了
// 判斷是否需要掛起當前執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 異常情況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明了
if (failed)
cancelAcquire(node);
}
}
setHead
這個方法每當有一個node獲取到鎖了,就把當前node
節點設置為頭節點,可以簡單的看做當前節點獲取到鎖了就把當前節點」移除「(變為頭結點)隊列。
shouldParkAfterFailedAcquire
說到這個方法我們就要先看下NODE可能會有哪些狀態在源碼裡面我們可以看到總共會有四種狀態
- CANCELLED:值為1,在同步隊列中等待的執行緒等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
- SIGNAL:值為-1,被標識為該等待喚醒狀態的後繼結點,當其前繼結點的執行緒釋放了同步鎖或被取消,將會通知該後繼結點的執行緒執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的後繼結點的執行緒執行。
- CONDITION:值為-2,與Condition相關,該標識的結點處於等待隊列中,結點的執行緒等待在Condition上,當其他執行緒調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
- PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的執行緒處於可運行狀態。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驅節點狀態 如果這個狀態為-1 則返回true,把當前執行緒掛起
if (ws == Node.SIGNAL)
return true;
// 大於0,說明狀態為CANCELLED
if (ws > 0) {
do {
// 刪除被取消的node(讓被取消的node成為一個沒有引用的node等著下次GC被回收)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 進入這裡只能是 0,-2,-3。NODE節點初始化的時候waitStatus默認值是0,所以只有這裡才有修改waitStatus的地方
// 通過cas 把前驅節點的狀態設置為-1,然後返回false ,外面調用這個方法的是個循環,又會調用一次這個方法
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
掛起當前執行緒,並且阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 掛起當前執行緒,阻塞
return Thread.interrupted();
}
解鎖
加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就好了
private void unparkSuccessor(Node node) {
// 頭結點狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// s==null head的successor節點獲取鎖成功後,執行了head.next=null的操作後,解鎖執行緒讀取了head.next,因此s==null
// head的successor節點被取消(cancelAcquire)時,執行了如下操作:successor.waitStatus=1 ; successor.next = successor;
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾節點開始往前找,找到最前面的非取消的節點 這裡沒有break 哦
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒執行緒 ,喚醒的執行緒會從acquireQueued去獲取鎖
LockSupport.unpark(s.thread);
}
釋放鎖程式碼比較簡單,基本都寫在程式碼注釋裡面了,流程如下:
這段程式碼裡面有一個比較經典的面試題:
如果頭結點的下一個節點為空或者頭結點的下一個節點的狀態為取消的時候為什麼要從後往前找,找到最前面非取消的節點?
- node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往後找了,所以需要從後往前找。
- 在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,因此也是必須要從後往前遍歷才能夠遍歷完全部的Node
總結
- reentrantLock的獲取鎖和釋放鎖基本就講完了,裡面還涉及多比較多的細節,感興趣的同學可以對著源碼一行一行去debug試試。
- 適當的了解aqs才能更好的學習CountDownLatch、CyclicBarrier、Sempahore,因為這三個利器都是基於aqs來實現的。
結束
- 由於自己才疏學淺,難免會有紕漏,假如你發現了錯誤的地方,還望留言給我指出來,我會對其加以修正。
- 如果你覺得文章還不錯,你的轉發、分享、讚賞、點贊、留言就是對我最大的鼓勵。
- 感謝您的閱讀,十分歡迎並感謝您的關注。
站在巨人的肩膀上摘蘋果:
//tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
//javadoop.com/post/AbstractQueuedSynchronizer
//www.cnblogs.com/yanlong300/p/10953185.html