AQS中的公平鎖與非公平鎖,Condtion
- 2019 年 10 月 7 日
- 筆記
AQS中的公平鎖與非公平鎖,Condtion
一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)
轉自https://www.javadoop.com/post/AbstractQueuedSynchronizer-2
文章比較長,資訊量比較大,建議在 pc 上閱讀。文章標題是為了呼應前文,其實可以單獨成文的,主要是希望讀者看文章能系統看。
本文關注以下幾點內容:
- 深入理解 ReentrantLock 公平鎖和非公平鎖的區別
- 深入分析 AbstractQueuedSynchronizer 中的 ConditionObject
- 深入理解 java 執行緒中斷和 InterruptedException 異常
基本上本文把以上幾點都說清楚了,我假設讀者看過上一篇文章中對 AbstractQueuedSynchronizer 的介紹 ,當然如果你已經熟悉 AQS 中的獨佔鎖了,那也可以直接看這篇。各小節之間基本上沒什麼關係,大家可以只關注自己感興趣的部分。
- 公平鎖和非公平鎖
- Condition
- 1. 將節點加入到條件隊列
- 2. 完全釋放獨佔鎖
- 3. 等待進入阻塞隊列
- 4. signal 喚醒執行緒,轉移到阻塞隊列
- 5. 喚醒後檢查中斷狀態
- 6. 獲取獨佔鎖
- 7. 處理中斷狀態
- * 帶超時機制的 await
- * 不拋出 InterruptedException 的 await
- AbstractQueuedSynchronizer 獨佔鎖的取消排隊
- 再說 java 執行緒中斷和 InterruptedException 異常
- 執行緒中斷
- InterruptedException 概述
- 處理中斷
- 總結
公平鎖和非公平鎖
ReentrantLock 默認採用非公平鎖,除非你在構造方法中傳入參數 true 。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平鎖的 lock 方法:
static final class FairSync extends Sync { final void lock() { acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1. 和非公平鎖相比,這裡多了一個判斷:是否有執行緒在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非公平鎖的 lock 方法:
static final class NonfairSync extends Sync { final void lock() { // 2. 和公平鎖相比,這裡會直接先進行一次CAS,成功就返回了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
總結:公平鎖和非公平鎖只有兩處不同:
- 非公平鎖在調用 lock 後,首先就會調用 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被佔用,那麼直接就獲取到鎖返回了。
- 非公平鎖在 CAS 失敗後,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷等待隊列是否有執行緒處於等待狀態,如果有則不去搶鎖,乖乖排到後面。
公平鎖和非公平鎖就這兩點區別,如果這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是一樣的,都要進入到阻塞隊列等待喚醒。
相對來說,非公平鎖會有更好的性能,因為它的吞吐量比較大。當然,非公平鎖讓獲取鎖的時間變得更加不確定,可能會導致在阻塞隊列中的執行緒長期處於飢餓狀態。
Condition
Tips: 這裡重申一下,要看懂這個,必須要先看懂上一篇關於 AbstractQueuedSynchronizer 的介紹,或者你已經有相關的知識了,否則這節肯定是看不懂的。
我們先來看看 Condition 的使用場景,Condition 經常可以用在生產者-消費者的場景中,請看 Doug Lea 給出的這個例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); // condition 依賴於 lock 來產生 final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; // 生產 public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去 } finally { lock.unlock(); } } // 消費 public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); // 隊列為空,等待,直到隊列 not empty,才能繼續消費 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去 return x; } finally { lock.unlock(); } } }
(ArrayBlockingQueue 採用這種方式實現了生產者-消費者,所以請只把這個例子當做學習例子,實際生產中可以直接使用 ArrayBlockingQueue)
我們常用 obj.wait(),obj.notify() 或 obj.notifyAll() 來實現相似的功能,但是,它們是基於對象的監視器鎖的。需要深入了解這幾個方法的讀者,可以參考我的另一篇文章《深入分析 java 8 程式語言規範:Threads and Locks》。而這裡說的 Condition 是基於 ReentrantLock 實現的,而 ReentrantLock 是依賴於 AbstractQueuedSynchronizer 實現的。
在往下看之前,讀者心裡要有一個整體的概念。condition 是依賴於 ReentrantLock 的,不管是調用 await 進入等待還是 signal 喚醒,都必須獲取到鎖才能進行操作。
每個 ReentrantLock 實例可以通過調用多次 newCondition 產生多個 ConditionObject 的實例:
final ConditionObject newCondition() { return new ConditionObject(); }
我們首先來看下我們關注的 Condition 的實現類 AbstractQueuedSynchronizer
類中的 ConditionObject
。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 條件隊列的第一個節點 // 不要管這裡的關鍵字 transient,是不參與序列化的意思 private transient Node firstWaiter; // 條件隊列的最後一個節點 private transient Node lastWaiter; ......
在上一篇介紹 AQS 的時候,我們有一個阻塞隊列,用於保存等待獲取鎖的執行緒的隊列。這裡我們引入另一個概念,叫條件隊列(condition queue),我畫了一張簡單的圖用來說明這個。
這裡的阻塞隊列如果叫做同步隊列(sync queue)其實比較貼切,不過為了和前篇呼應,我就繼續使用阻塞隊列了。記住這裡的兩個概念,阻塞隊列和條件隊列。

這裡,我們簡單回顧下 Node 的屬性:
volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;
prev 和 next 用於實現阻塞隊列的雙向鏈表,nextWaiter 用於實現條件隊列的單向鏈表
基本上,把這張圖看懂,你也就知道 condition 的處理流程了。所以,我先簡單解釋下這圖,然後再具體地解釋程式碼實現。
- 我們知道一個 ReentrantLock 實例可以通過多次調用 newCondition() 來產生多個 Condition 實例,這裡對應 condition1 和 condition2。注意,ConditionObject 只有兩個屬性 firstWaiter 和 lastWaiter;
- 每個 condition 有一個關聯的條件隊列,如執行緒 1 調用 condition1.await() 方法即可將當前執行緒 1 包裝成 Node 後加入到條件隊列中,然後阻塞在這裡,不繼續往下執行,條件隊列是一個單向鏈表;
- 調用 condition1.signal() 會將condition1 對應的條件隊列的 firstWaiter 移到阻塞隊列的隊尾,等待獲取鎖,獲取鎖後 await 方法返回,繼續往下執行。
我這裡說的 1、2、3 是最簡單的流程,沒有考慮中斷、signalAll、還有帶有超時參數的 await 方法等,不過把這裡弄懂是這節的主要目的。
同時,從圖中也可以很直觀地看出,哪些操作是執行緒安全的,哪些操作是執行緒不安全的。
這個圖看懂後,下面的程式碼分析就簡單了。
接下來,我們一步步按照流程來走程式碼分析,我們先來看看 wait 方法:
// 首先,這個方法是可被中斷的,不可被中斷的是另一個方法 awaitUninterruptibly() // 這個方法會阻塞,直到調用 signal 方法(指 signal() 和 signalAll(),下同),或被中斷 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的條件隊列中 Node node = addConditionWaiter(); // 釋放鎖,返回值是釋放鎖之前的 state 值 int savedState = fullyRelease(node); int interruptMode = 0; // 這裡退出循環有兩種情況,之後再仔細分析 // 1. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了 // 2. checkInterruptWhileWaiting(node) != 0 會到 break,然後退出循環,代表的是執行緒中斷 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒後,將進入阻塞隊列,等待獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
其實,我大體上也把整個 await 過程說得十之八九了,下面我們分步把上面的幾個點用源碼說清楚。
1. 將節點加入到條件隊列
addConditionWaiter() 是將當前節點加入到條件隊列,看圖我們知道,這種條件隊列內的操作是執行緒安全的。
// 將當前執行緒對應的節點入隊,插入隊尾 private Node addConditionWaiter() { Node t = lastWaiter; // 如果條件隊列的最後一個節點取消了,將其清除出去 if (t != null && t.waitStatus != Node.CONDITION) { // 這個方法會遍歷整個條件隊列,然後會將已取消的所有節點清除出隊列 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果隊列為空 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除隊列中已經取消等待的節點。
當 await 的時候如果發生了取消操作(這點之後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會調用一次這個方法。
// 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去 // 純屬鏈表操作,很好理解,看不懂多看幾遍就可以了 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 如果節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
2. 完全釋放獨佔鎖
回到 wait 方法,節點入隊了以後,會調用 intsavedState=fullyRelease(node);
方法釋放鎖,注意,這裡是完全釋放獨佔鎖,因為 ReentrantLock 是可以重入的。
// 首先,我們要先觀察到返回值 savedState 代表 release 之前的 state 值 // 對於最簡單的操作:先 lock.lock(),然後 condition1.await()。 // 那麼 state 經過這個方法由 1 變為 0,鎖釋放,此方法返回 1 // 相應的,如果 lock 重入了 n 次,savedState == n // 如果這個方法失敗,會將節點設置為"取消"狀態,並拋出異常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 這裡使用了當前的 state 作為 release 的參數,也就是完全釋放掉鎖,將 state 置為 0 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
3. 等待進入阻塞隊列
釋放掉鎖以後,接下來是這段,這邊會自旋,如果發現自己還沒到阻塞隊列,那麼掛起,等待被轉移到阻塞隊列。
int interruptMode = 0; while (!isOnSyncQueue(node)) { // 執行緒掛起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
isOnSyncQueue(Node node) 用於判斷節點是否已經轉移到阻塞隊列了:
// 在節點入條件隊列的時候,初始化時設置了 waitStatus = Node.CONDITION // 前面我提到,signal 的時候需要將節點從條件隊列移到阻塞隊列, // 這個方法就是判斷 node 是否已經移動到阻塞隊列了 final boolean isOnSyncQueue(Node node) { // 移動過去的時候,node 的 waitStatus 會置為 0,這個之後在說 signal 方法的時候會說到 // 如果 waitStatus 還是 Node.CONDITION,也就是 -2,那肯定就是還在條件隊列中 // 如果 node 的前驅 prev 指向還是 null,說明肯定沒有在 阻塞隊列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果 node 已經有後繼節點 next 的時候,那肯定是在阻塞隊列了 if (node.next != null) return true; // 這個方法從阻塞隊列的隊尾開始從後往前遍歷找,如果找到相等的,說明在阻塞隊列,否則就是不在阻塞隊列 // 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞隊列嗎?答案是:不能。 // 這個可以看上篇 AQS 的入隊方法,首先設置的是 node.prev 指向 tail, // 然後是 CAS 操作將自己設置為新的 tail,可是這次的 CAS 是可能失敗的。 // 調用這個方法的時候,往往我們需要的就在隊尾的部分,所以一般都不需要完全遍歷整個隊列的 return findNodeFromTail(node); } // 從同步隊列的隊尾往前遍歷,如果找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
回到前面的循環,isOnSyncQueue(node) 返回 false 的話,那麼進到 LockSupport.park(this);
這裡執行緒掛起。
4. signal 喚醒執行緒,轉移到阻塞隊列
為了大家理解,這裡我們先看喚醒操作,因為剛剛到 LockSupport.park(this); 把執行緒掛起了,等待喚醒。
喚醒操作通常由另一個執行緒來操作,就像生產者-消費者模式中,如果執行緒因為等待消費而掛起,那麼當生產者生產了一個東西後,會調用 signal 喚醒正在等待的執行緒來消費。
// 喚醒等待了最久的執行緒 // 其實就是,將這個執行緒對應的 node 從條件隊列轉移到阻塞隊列 public final void signal() { // 調用 signal 方法的執行緒必須持有當前的獨佔鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 從條件隊列隊頭往後遍歷,找出第一個需要轉移的 node // 因為前面我們說過,有些執行緒會取消排隊,但是還在隊列中 private void doSignal(Node first) { do { // 將 firstWaiter 指向 first 節點後面的第一個 // 如果將隊頭移除後,後面沒有節點在等待了,那麼需要將 lastWaiter 置為 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 因為 first 馬上要被移到阻塞隊列了,和條件隊列的鏈接關係在這裡斷掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 這裡 while 循環,如果 first 轉移不成功,那麼選擇 first 後面的第一個節點進行轉移,依此類推 } // 將節點從條件隊列轉移到阻塞隊列 // true 代表成功轉移 // false 代表在 signal 之前,節點已經取消了 final boolean transferForSignal(Node node) { // CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消, // 既然已經取消,也就不需要轉移了,方法返回,轉移後面一個節點 // 否則,將 waitStatus 置為 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋進入阻塞隊列的隊尾 // 注意,這裡的返回值 p 是 node 在阻塞隊列的前驅節點 Node p = enq(node); int ws = p.waitStatus; // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的執行緒。喚醒之後會怎麼樣,後面再解釋 // 如果 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用,上篇介紹的時候說過,節點入隊後,需要把前驅節點的狀態設為 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驅節點取消或者 CAS 失敗,會進到這裡喚醒執行緒,之後的操作看下一節 LockSupport.unpark(node.thread); return true; }
正常情況下, ws>0||!compareAndSetWaitStatus(p,ws,Node.SIGNAL)
這句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 會返回 true,所以一般也不會進去 if 語句塊中喚醒 node 對應的執行緒。然後這個方法返回 true,也就意味著 signal 方法結束了,節點進入了阻塞隊列。
假設發生了阻塞隊列中的前驅節點取消等待,或者 CAS 失敗,只要喚醒執行緒,讓其進到下一步即可。
5. 喚醒後檢查中斷狀態
上一步 signal 之後,我們的執行緒由條件隊列轉移到了阻塞隊列,之後就準備獲取鎖了。只要重新獲取到鎖了以後,繼續往下執行。
等執行緒從掛起中恢復過來,繼續往下看
int interruptMode = 0; while (!isOnSyncQueue(node)) { // 執行緒掛起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
先解釋下 interruptMode。interruptMode 可以取值為 REINTERRUPT(1),THROW_IE(-1),0
- REINTERRUPT: 代表 await 返回的時候,需要重新設置中斷狀態
- THROW_IE: 代表 await 返回的時候,需要拋出 InterruptedException 異常
- 0 :說明在 await 期間,沒有發生中斷
有以下三種情況會讓 LockSupport.park(this); 這句返回繼續往下執行:
- 常規路勁。signal -> 轉移節點到阻塞隊列 -> 獲取了鎖(unpark)
- 執行緒中斷。在 park 的時候,另外一個執行緒對這個執行緒進行了中斷
- signal 的時候我們說過,轉移以後的前驅節點取消了,或者對前驅節點的CAS操作失敗了
- 假喚醒。這個也是存在的,和 Object.wait() 類似,都有這個問題
執行緒喚醒後第一步是調用 checkInterruptWhileWaiting(node) 這個方法,此方法用於判斷是否在執行緒掛起期間發生了中斷,如果發生了中斷,是 signal 調用之前中斷的,還是 signal 之後發生的中斷。
// 1. 如果在 signal 之前已經中斷,返回 THROW_IE // 2. 如果是 signal 之後中斷,返回 REINTERRUPT // 3. 沒有發生中斷,返回 0 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
Thread.interrupted():如果當前執行緒已經處於中斷狀態,那麼該方法返回 true,同時將中斷狀態重置為 false,所以,才有後續的
重新中斷(REINTERRUPT)
的使用。
看看怎麼判斷是 signal 之前還是之後發生的中斷:
// 只有執行緒處於中斷狀態,才會調用此方法 // 如果需要的話,將這個已經取消等待的節點轉移到阻塞隊列 // 返回 true:如果此執行緒在 signal 之前被取消, final boolean transferAfterCancelledWait(Node node) { // 用 CAS 將節點狀態設置為 0 // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,因為如果 signal 先發生的話,signal 中會將 waitStatus 設置為 0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 將節點放入阻塞隊列 // 這裡我們看到,即使中斷了,依然會轉移到阻塞隊列 enq(node); return true; } // 到這裡是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設置為了 0 // signal 方法會將節點轉移到阻塞隊列,但是可能還沒完成,這邊自旋等待其完成 // 當然,這種事情還是比較少的吧:signal 調用之後,沒完成轉移之前,發生了中斷 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
這裡再說一遍,即使發生了中斷,節點依然會轉移到阻塞隊列。
到這裡,大家應該都知道這個 while 循環怎麼退出了吧。要麼中斷,要麼轉移成功。
6. 獲取獨佔鎖
while 循環出來以後,下面是這段程式碼:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
由於 while 出來後,我們確定節點已經進入了阻塞隊列,準備獲取鎖。
這裡的 acquireQueued(node, savedState) 的第一個參數 node 之前已經經過 enq(node) 進入了隊列,參數 savedState 是之前釋放鎖前的 state,這個方法返回的時候,代表當前執行緒獲取了鎖,而且 state == savedState了。
注意,前面我們說過,不管有沒有發生中斷,都會進入到阻塞隊列,而 acquireQueued(node, savedState) 的返回值就是代表執行緒是否被中斷。如果返回 true,說明被中斷了,而且 interruptMode != THROW_IE,說明在 signal 之前就發生中斷了,這裡將 interruptMode 設置為 REINTERRUPT,用於待會重新中斷。
繼續往下:
if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
本著一絲不苟的精神,這邊說說 node.nextWaiter!=null
怎麼滿足。我前面也說了 signal 的時候會將節點轉移到阻塞隊列,有一步是 node.nextWaiter = null,將斷開節點和條件隊列的聯繫。
可是, 在判斷發生中斷的情況下,是signal之前還是之後發生的?
這部分的時候,我也介紹了,如果 signal 之前就中斷了,也需要將節點進行轉移到阻塞隊列,這部分轉移的時候,是沒有設置 node.nextWaiter = null 的。
之前我們說過,如果有節點取消,也會調用 unlinkCancelledWaiters 這個方法,就是這裡了。
7. 處理中斷狀態
到這裡,我們終於可以好好說下這個 interruptMode 幹嘛用了。
- 0:什麼都不做。
- THROW_IE:await 方法拋出 InterruptedException 異常
- REINTERRUPT:重新中斷當前執行緒
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
為什麼這麼處理?這部分的知識在本文的最後一節
* 帶超時機制的 await
經過前面的 7 步,整個 ConditionObject 類基本上都分析完了,接下來簡單分析下帶超時機制的 await 方法。
public final long awaitNanos(long nanosTimeout) throws InterruptedException public final boolean awaitUntil(Date deadline) throws InterruptedException public final boolean await(long time, TimeUnit unit) throws InterruptedException
這三個方法都差不多,我們就挑一個出來看看吧:
public final boolean await(long time, TimeUnit unit) throws InterruptedException { // 等待這麼多納秒 long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 當前時間 + 等待時長 = 過期時間 final long deadline = System.nanoTime() + nanosTimeout; // 用於返回 await 是否超時 boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { // 時間到啦 if (nanosTimeout <= 0L) { // 這裡因為要 break 取消等待了。取消等待的話一定要調用 transferAfterCancelledWait(node) 這個方法 // 如果這個方法返回 true,在這個方法內,將節點轉移到阻塞隊列成功 // 返回 false 的話,說明 signal 已經發生,signal 方法將節點轉移了。也就是說沒有超時嘛 timedout = transferAfterCancelledWait(node); break; } // spinForTimeoutThreshold 的值是 1000 納秒,也就是 1 毫秒 // 也就是說,如果不到 1 毫秒了,那就不要選擇 parkNanos 了,自旋的性能反而更好 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 得到剩餘時間 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
超時的思路還是很簡單的,不帶超時參數的 await 是 park,然後等待別人喚醒。而現在就是調用 parkNanos 方法來休眠指定的時間,醒來後判斷是否 signal 調用了,調用了就是沒有超時,否則就是超時了。超時的話,自己來進行轉移到阻塞隊列,然後搶鎖。
* 不拋出 InterruptedException 的 await
關於 Condition 最後一小節了。
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
很簡單,我就不廢話了。
AbstractQueuedSynchronizer 獨佔鎖的取消排隊
這篇文章說的是 AbstractQueuedSynchronizer,只不過好像 Condition 說太多了,趕緊把思路拉回來。
接下來,我想說說怎麼取消對鎖的競爭?
上篇文章提到過,最重要的方法是這個,我們要在這裡面找答案:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先,到這個方法的時候,節點一定是入隊成功的。
我把 parkAndCheckInterrupt() 程式碼貼過來:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這兩段程式碼聯繫起來看,是不是就清楚了。
如果我們要取消一個執行緒的排隊,我們需要在另外一個執行緒中對其進行中斷。比如某執行緒調用 lock() 老久不返回,我想中斷它。一旦對其進行中斷,此執行緒會從 LockSupport.park(this);
中喚醒,然後 Thread.interrupted();
返回 true。
我們發現一個問題,即使是中斷喚醒了這個執行緒,也就只是設置了 interrupted=true
然後繼續下一次循環。而且,由於 Thread.interrupted();
會清除中斷狀態,第二次進 parkAndCheckInterrupt 的時候,返回會是 false。
所以,我們要看到,在這個方法中,interrupted 只是用來記錄是否發生了中斷,然後用於方法返回值,其他沒有做任何相關事情。
所以,我們看外層方法怎麼處理 acquireQueued 返回 false 的情況。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
所以說,lock() 方法處理中斷的方法就是,你中斷歸中斷,我搶鎖還是照樣搶鎖,幾乎沒關係,只是我搶到鎖了以後,設置執行緒的中斷狀態而已,也不拋出任何異常出來。調用者獲取鎖後,可以去檢查是否發生過中斷,也可以不理會。
來條分割線。有沒有被騙的感覺,我說了一大堆,可是和取消沒有任何關係啊。
我們來看 ReentrantLock 的另一個 lock 方法:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
方法上多了個 throwsInterruptedException
,經過前面那麼多知識的鋪墊,這裡我就不再啰里啰嗦了。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
繼續往裡:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 就是這裡了,一旦異常,馬上結束這個方法,拋出異常。 // 這裡不再只是標記這個方法的返回值代表中斷狀態 // 而是直接拋出異常,而且外層也不捕獲,一直往外拋到 lockInterruptibly throw new InterruptedException(); } } finally { // 如果通過 InterruptedException 異常出去,那麼 failed 就是 true 了 if (failed) cancelAcquire(node); } }
既然到這裡了,順便說說 cancelAcquire 這個方法吧:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
到這裡,我想我應該把取消排隊這件事說清楚了吧。
再說 java 執行緒中斷和 InterruptedException 異常
在之前的文章中,我們接觸了大量的中斷,這邊算是個總結吧。如果你完全熟悉中斷了,沒有必要再看這節,本節為新手而寫。
執行緒中斷
首先,我們要明白,中斷不是類似 linux 裡面的命令 kill -9 pid,不是說我們中斷某個執行緒,這個執行緒就停止運行了。中斷代表執行緒狀態,每個執行緒都關聯了一個中斷狀態,是一個 true 或 false 的 boolean 值,初始值為 false。
關於中斷狀態,我們需要重點關注以下幾個方法:
// Thread 類中的實例方法,持有執行緒實例引用即可檢測執行緒中斷狀態 public boolean isInterrupted() {} // Thread 中的靜態方法,檢測調用這個方法的執行緒是否已經中斷 // 注意:這個方法返回中斷狀態的同時,會將此執行緒的中斷狀態重置為 false // 所以,如果我們連續調用兩次這個方法的話,第二次的返回值肯定就是 false 了 public static boolean interrupted() {} // Thread 類中的實例方法,用於設置一個執行緒的中斷狀態為 true public void interrupt() {}
我們說中斷一個執行緒,其實就是設置了執行緒的 interrupted status 為 true,至於說被中斷的執行緒怎麼處理這個狀態,那是那個執行緒自己的事。如以下程式碼:
while (!Thread.interrupted()) { doWork(); System.out.println("我做完一件事了,準備做下一件,如果沒有其他執行緒中斷我的話"); }
當然,中斷除了是執行緒狀態外,還有其他含義,否則也不需要專門搞一個這個概念出來了。
如果執行緒處於以下三種情況,那麼當執行緒被中斷的時候,能自動感知到:
- 來自 Object 類的 wait()、wait(long)、wait(long, int), 來自 Thread 類的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int) 這幾個方法的相同之處是,方法上都有: throws InterruptedException 如果執行緒阻塞在這些方法上(我們知道,這些方法會讓當前執行緒阻塞),這個時候如果其他執行緒對這個執行緒進行了中斷,那麼這個執行緒會從這些方法中立即返回,拋出 InterruptedException 異常,同時重置中斷狀態為 false。
- 實現了 InterruptibleChannel 介面的類中的一些 I/O 阻塞操作,如 DatagramChannel 中的 connect 方法和 receive 方法等 如果執行緒阻塞在這裡,中斷執行緒會導致這些方法拋出 ClosedByInterruptException 並重置中斷狀態。
- Selector 中的 select 方法,這個有機會我們在講 NIO 的時候說 一旦中斷,方法立即返回
對於以上 3 種情況是最特殊的,因為他們能自動感知到中斷(這裡說自動,當然也是基於底層實現),並且在做出相應的操作後都會重置中斷狀態為 false。
那是不是只有以上 3 種方法能自動感知到中斷呢?不是的,如果執行緒阻塞在 LockSupport.park(Object obj) 方法,也叫掛起,這個時候的中斷也會導致執行緒喚醒,但是喚醒後不會重置中斷狀態,所以喚醒後去檢測中斷狀態將是 true。
InterruptedException 概述
它是一個特殊的異常,不是說 JVM 對其有特殊的處理,而是它的使用場景比較特殊。通常,我們可以看到,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法,Thread 中的 sleep() 方法等等,這些方法都帶有 throwsInterruptedException
,我們通常稱這些方法為阻塞方法(blocking method)。
阻塞方法一個很明顯的特徵是,它們需要花費比較長的時間(不是絕對的,只是說明時間不可控),還有它們的方法結束返回往往依賴於外部條件,如 wait 方法依賴於其他執行緒的 notify,lock 方法依賴於其他執行緒的 unlock等等。
當我們看到方法上帶有 throwsInterruptedException
時,我們就要知道,這個方法應該是阻塞方法,我們如果希望它能早點返回的話,我們往往可以通過中斷來實現。
除了幾個特殊類(如 Object,Thread等)外,感知中斷並提前返回是通過輪詢中斷狀態來實現的。我們自己需要寫可中斷的方法的時候,就是通過在合適的時機(通常在循環的開始處)去判斷執行緒的中斷狀態,然後做相應的操作(通常是方法直接返回或者拋出異常)。當然,我們也要看到,如果我們一次循環花的時間比較長的話,那麼就需要比較長的時間才能注意到執行緒中斷了。
處理中斷
一旦中斷髮生,我們接收到了這個資訊,然後怎麼去處理中斷呢?本小節將簡單分析這個問題。
我們經常會這麼寫程式碼:
try { Thread.sleep(10000); } catch (InterruptedException e) { // ignore } // go on
當 sleep 結束繼續往下執行的時候,我們往往都不知道這塊程式碼是真的 sleep 了 10 秒,還是只休眠了 1 秒就被中斷了。這個程式碼的問題在於,我們將這個異常資訊吞掉了。(對於 sleep 方法,我相信大部分情況下,我們都不在意是否是中斷了,這裡是舉例)
AQS 的做法很值得我們借鑒,我們知道 ReentrantLock 有兩種 lock 方法:
public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
前面我們提到過,lock() 方法不響應中斷。如果 thread1 調用了 lock() 方法,過了很久還沒搶到鎖,這個時候 thread2 對其進行了中斷,thread1 是不響應這個請求的,它會繼續搶鎖,當然它不會把「被中斷」這個資訊扔掉。我們可以看以下程式碼:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 我們看到,這裡也沒做任何特殊處理,就是記錄下來中斷狀態。 // 這樣,如果外層方法需要去檢測的時候,至少我們沒有把這個資訊丟了 selfInterrupt();// Thread.currentThread().interrupt(); }
而對於 lockInterruptibly() 方法,因為其方法上面有 throwsInterruptedException
,這個訊號告訴我們,如果我們要取消執行緒搶鎖,直接中斷這個執行緒即可,它會立即返回,拋出 InterruptedException 異常。
在並發包中,有非常多的這種處理中斷的例子,提供兩個方法,分別為響應中斷和不響應中斷,對於不響應中斷的方法,記錄中斷而不是丟失這個資訊。如 Condition 中的兩個方法就是這樣的:
void await() throws InterruptedException; void awaitUninterruptibly();
通常,如果方法會拋出 InterruptedException 異常,往往方法體的第一句就是:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ...... }
熟練使用中斷,對於我們寫出優雅的程式碼是有幫助的,也有助於我們分析別人的源碼。
參考:https://www.ibm.com/developerworks/library/j-jtp05236/index.html 翻譯:https://www.ibm.com/developerworks/cn/java/j-jtp05236.html