Java並發之ReentrantLock源碼解析(一)
ReentrantLock
ReentrantLock是一種可重入的互斥鎖,它的行為和作用與關鍵字synchronized有些類似,在並發場景下可以讓多個執行緒按照一定的順序訪問同一資源。相比synchronized,ReentrantLock多了可擴展的能力,比如我們可以創建一個名為MyReentrantLock的類繼承ReentrantLock,並重寫部分方法使其更加高效。
當一個執行緒調用ReentrantLock.lock()方法時,如果ReentrantLock沒有被其他執行緒持有,且不存在額外的執行緒與當前執行緒競爭ReentrantLock,調用ReentrantLock.lock()方法後當前執行緒會佔有此鎖並立即返回,ReentrantLock內部會維護當前執行緒對鎖的引用計數,當執行緒獲取鎖時會增加其執行緒對鎖的引用計數,當執行緒釋放鎖時會減少執行緒對鎖的引用計數,當前執行緒如果在佔有鎖之後,又重複獲取鎖,則會增加鎖的引用計數,當鎖的引用計數為0的時候,代表當前執行緒完全釋放鎖。需要注意的是,只有佔有鎖的執行緒才會增加鎖的引用計數,當鎖被佔據時,如果有其他執行緒要競爭鎖,ReentrantLock會把其他執行緒加入一個競爭鎖的隊列,並讓執行緒陷入阻塞,直到佔據鎖的執行緒釋放了鎖,ReentrantLock才會喚醒隊列中的執行緒重新競爭鎖。
我們用下面的例子來加深對於鎖的理解,假設我們的進程內目前沒有任何執行緒競爭lock,此時鎖的引用計數為0,有一個執行緒Thread-1調用完下面<1>處的lock()方法成功佔有鎖,此時鎖的引用計數由0變為1。之後Thread-1調用了<2>處的methodB()方法,methodB()的<4>處又獲取了一次鎖,由於lock已經被Thread-1佔據,所以這裡簡單的對鎖的引用計數+1即可,此時鎖的引用計數為2,Thread-1執行完methodB()的方法體後,執行<5>處的unlock()方法釋放鎖,這裡對鎖的引用計數-1,由2變為1。在調用完methodB後,執行methodA的方法體,最後執行<3>處的unlock()方法,將鎖的引用計數由1變為0,Thread-1完全釋放鎖。此時,鎖變為無主狀態。
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock();//<1> methodB();//<2> //methodA body... } finally { lock.unlock();//<3> } } public void methodB() { try { lock.lock();//<4> //methodB body... } finally { lock.unlock();//<5> } }
ReentrantLock提供了isHeldByCurrentThread()和getHoldCount()兩個方法,前者用於判斷鎖是否被當先調用執行緒持有,如果被當前調用執行緒持有則返回true;後者不僅會判斷鎖是否被當前執行緒持有,還會返回鎖相對於當前執行緒的引用計數,畢竟鎖是可重入的,如果鎖沒有被任何執行緒持有,或者被不是持有鎖的執行緒調用getHoldCount()方法,就會返回0。
這兩個方法的實現原理也很簡單,我們知道在Java中可以調用Thread.currentThread()來獲取當前執行緒對象。當我們調用ReentrantLock.lock()方法成功獲取鎖之後,ReentrantLock內部會用一個獨佔執行緒(exclusiveOwnerThread)欄位來標識當前佔用鎖的Thread執行緒對象,如果執行緒釋放了鎖且鎖的引用計數為0,則將獨佔執行緒欄位標記為null。當要判斷鎖是否被當前執行緒持有,或者鎖相對於當前執行緒的引用計數,則獲取調用方執行緒的Thread對象,和內部的獨佔執行緒欄位做下對比,如果兩者的引用相等,代表當前執行緒佔用了鎖,如果引用不相等,則表示當前所可能處於無主狀態,或者鎖被其他執行緒持有。
如下面的程式碼,我們希望只有持有lock的執行緒才可以執行methodB()和methodC()方法,就可以用isHeldByCurrentThread()和getHoldCount()進行判斷。
private final ReentrantLock lock = new ReentrantLock(); public void methodA() { try { lock.lock(); methodB(); methodC(); //methodA body... } finally { lock.unlock(); } } public void methodB() { if (lock.getHoldCount() != 0) { //methodB body... } } public void methodC() { if (lock.isHeldByCurrentThread()) { //methodC body... } }
需要注意的一點是,官方有給出isHeldByCurrentThread()和getHoldCount()兩個方法的使用範圍,僅針對於debug和測試。真正的生產環境如果有重入鎖的需要,官方還是推薦用try{}finally{}這一套,在try程式碼塊里獲取鎖,在finally塊中釋放鎖。
創建ReentrantLock對象時,如果使用的是無參構造方法,則默認創建非公平鎖(NonfairSync),如果調用的是ReentrantLock(boolean fair)有參構造方法,fair為true則創建公平鎖(FairSync)。
public class ReentrantLock implements Lock, java.io.Serializable { //... //默認創建非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } //根據參數指定創建公平鎖或非公平鎖,true為公平鎖。 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //... }
之前說過,當有多個執行緒競爭鎖時,獲取鎖失敗的執行緒,會形成一個隊列。如果有多個執行緒競爭公平鎖時,會優先把鎖分配給等待鎖時間最長的執行緒,即隊頭的執行緒,隊列中越往後的執行緒等待鎖的時間越短,排在隊尾的執行緒等待時間最短。如果使用的是非公平鎖,則不保證會按照等待時長順序將鎖分配。在多執行緒的場景下,公平鎖在吞吐量方面的表現不如非公平鎖,但兩者在獲得鎖和保證不飢餓的差異並不大。
需要注意的是,公平鎖不能保證執行緒調度的公平性,競爭公平鎖的多個執行緒中,可能會出現一個執行緒連續多次獲得鎖的情況。比如:Thread-1、Thread-2都要競爭同一個鎖(lock),但此時鎖已經被其他執行緒佔據,Thread-1、Thread-2競爭失敗,預備進入等待隊列,這時Thread-1、Thread-2的CPU時間片消耗完畢被掛起,而其他執行緒剛好釋放鎖將鎖變為無主狀態,此時Thread-3搶鎖成功,並調用下面的doThread3()方法,連續10次獲取鎖並釋放鎖將鎖變為無主狀態。這種情況,就是上面說的公平鎖無法保證執行緒調度的公平性,按照順序,Thread-3在Thread-1、Thread-2競爭失敗後才開始競爭,按理鎖的分配順序應該是Thread-1->Thread-2->Thread-3,但由於執行緒的調度問題,Thread-1、Thread-2尚未入隊,而鎖被釋放後剛好被Thread-3「撿漏」
public void methodA() { try { lock.lock(); //methodA body... } finally { lock.unlock(); } } public void doThread3() { for (int i = 0; i < 10; i++) { methodA(); } }
除了調用ReentrantLock.lock()以阻塞的方式直到獲取鎖,ReentrantLock還提供了tryLock()和tryLock(long timeout, TimeUnit unit)兩個方法來搶鎖。我們看下面的程式碼,相信很多同學看到這兩個方法後也能知道這兩個方法和lock()方法的區別,tryLock()會嘗試競爭鎖,如果鎖已被其他執行緒佔用,則競爭失敗,返回false,如果競爭成功,則返回true。tryLock(long timeout, TimeUnit unit)如果競爭鎖失敗後,會先進入等待隊列,如果在過期前能競爭到鎖,則返回true,如果在過期時間內都無法搶到鎖,則返回false。
public void methodD() { boolean hasLock = false; try { hasLock = lock.tryLock();//<1>非計時 if (!hasLock) {//沒有搶到鎖則退出 return; } //methodD body... } finally { if (hasLock) { lock.unlock(); } } } public void methodE() { boolean hasLock = false; try { hasLock = lock.tryLock(5, TimeUnit.SECONDS);//<2>計時 if (!hasLock) {//沒有搶到鎖則退出 return; } //methodE body... } catch (InterruptedException e) { e.printStackTrace(); } finally { if (hasLock) { lock.unlock(); } } }
需要注意的是:不管是公平鎖還是非公平鎖,不計時tryLock()都不能保證公平性,如果鎖可用,即時其他執行緒正在等待鎖,也會搶鎖成功。
ReentrantLock內部會用一個int欄位來標識鎖的引用次數,因此,ReentrantLock雖然作為可重入鎖,但它的最大可重入次數為2147483647(即:MaxInt32,2^31-1),不管我們是以遞歸或者是循環亦或者其他方式,一旦我們重複獲取鎖的次數超過這個次數,ReentrantLock就會拋出異常。
至此,我們了解了ReentrantLock的簡單應用。下面,就請大家一起跟隨筆者了解ReentrantLock的實現原理。下面的程式碼是筆者從ReentrantLock節選的部分程式碼,可以看到先前我們調用加鎖(lock、lockInterruptibly、tryLock)、解鎖(unlock)的程式碼,最後都會調用sync對象的方法,sync對象的類型是一個抽象類,在我們創建ReentrantLock對象時,會根據構造函數決定sync是公平鎖(FairSync),還是非公平鎖(NonfairSync),FairSync和NonfairSync都繼承自Sync,所以ReentrantLock在創建好具體的Sync對象後,便不再管關心公平鎖的邏輯或者是非公平鎖的邏輯,ReentrantLock只知道抽象類Sync實現了它所需要的功能,這個功能是公平亦或是非公平,由具體的實現子類來關心。
public class ReentrantLock implements Lock, java.io.Serializable { //... private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {//...} static final class NonfairSync extends Sync {//...} static final class FairSync extends Sync {//...} public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } //... }
鑒於ReentrantLock的無參構造函數是創建一個非公平鎖,可見官方更傾向於我們使用非公平鎖,這裡,我們就先從非公平鎖開始介紹。
當ReentrantLock為非公平鎖時,調用lock()方法會直接調用sync.acquire(1),NonfairSync和Sync兩個類都沒有實現acquire(int arg),這個方法是由AbstractQueuedSynchronizer(抽象隊列同步器,下面簡稱:AQS)實現的,也就是Sync的父類。
當執行緒競爭鎖時,會先調用tryAcquire(arg)方法試圖佔有鎖,AQS將tryAcquire(int arg)的實現交由子類,由子類決定是以公平還是非公平的方式佔有鎖,如果競爭成功tryAcquire(arg)則返回true,!tryAcquire(arg)的結果為false,於是就不會再調用<1>處後續的判斷,直接返回。如果佔有鎖失敗,這裡會先調用addWaiter(Node mode)方法,將當前調用執行緒封裝成一個Node對象,再調用acquireQueued(final Node node, int arg)將Node對象加入到等待隊列中,並使執行緒陷入阻塞。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//<1> selfInterrupt(); } //AbstractQueuedSynchronizer將tryAcquire(int arg)的實現交由子類 //java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
我們先來看NonfairSync實現的tryAcquire(int acquires)方法,這裡NonfairSync也是調用其父類Sync的nonfairTryAcquire(int acquires)方法。在AQS內部會維護一個volatile int state,可重入互斥鎖會用這個欄位存儲佔有鎖的執行緒對鎖的引用計數,即重複獲取鎖的次數。如果state為0,代表鎖目前沒有被任何執行緒佔有,這裡會用CAS的方式設置鎖的引用計數,如果設置成功,則執行<2>處的程式碼將獨佔執行緒(exclusiveOwnerThread)的引用指向當前調用執行緒,然後返回true表示加鎖成功。
如果當前state不為0,代表有執行緒正獨佔此鎖,會在<3>處判斷當前執行緒是否是獨佔執行緒,如果是的話則在<4>處增加鎖的引用計數,這裡同樣是修改state的值,但不需要像<1>處那樣用CAS的方式,因為<4>處的程式碼只有獨佔執行緒才可以執行,其他執行緒都無法執行。需要注意的一點是,state為int類型,最大值為:2^31-1,如果超過這個值state就會變為負數,就會報錯。如果一個執行緒在競爭鎖的時候,發現state不為0,且當前執行緒不是獨佔執行緒,則會返回false,表示搶鎖失敗。
//當調用AQS的acquire(int arg)時,會先調用由子類實現的tryAcquire(int acquires)方法 //java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire protected final boolean tryAcquire(int acquires) { //這裡會調用父類Sync的nonfairTryAcquire(int acquires)方法 return nonfairTryAcquire(acquires); } //java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire final boolean nonfairTryAcquire(int acquires) { //獲取當前執行緒對象 final Thread current = Thread.currentThread(); //這裡會獲取父類AQS的state欄位,在可重入互斥鎖里,state表示佔有鎖的執行緒的引用計數 int c = getState(); //如果state為0,表示目前鎖是無主狀態 if (c == 0) { //如果鎖處於無主狀態,則用CAS修改state,如果修改成功,表示佔有鎖成功 if (compareAndSetState(0, acquires)) {//<1> //佔有鎖成功後,這裡會設置鎖的獨佔執行緒 setExclusiveOwnerThread(current);//<2> return true; } } else if (current == getExclusiveOwnerThread()) {//<3>如果state不為0,代表現在有執行緒佔據鎖,如果請求鎖的執行緒和獨佔執行緒是同一個執行緒,則增加當前執行緒對鎖的引用計數 //鎖的最大可重入次數為(2^31-1),超過這個最大範圍,int就會變為負數,判斷nextc為負數時報錯。 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //重新設置state的值 setState(nextc);//<4> return true; } return false; } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... //在可重入互斥鎖中,state代表獨佔執行緒當前的重入次數 private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } //... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { //... //獨佔執行緒,當有執行緒佔據可重入互斥鎖時,會用此欄位存儲佔有鎖的執行緒 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
按照AbstractQueuedSynchronizer.acquire(int arg)的邏輯,如果搶鎖失敗,會繼而執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)這段程式碼。這裡我們需要先來了解下Node的數據結構,Node類是AQS的一個靜態內部類。如果眼尖的同學看到下面的prev和next,一定能很快猜出這就是我們先前所說的等待隊列,等待隊列實質上是一個雙端鏈表,即每個節點都可以知道自己的前驅,也可以知道自己的後繼。
//java.util.concurrent.locks.AbstractQueuedSynchronizer.Node static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; //... volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; //... //返回當前節點的前驅節點 final Node predecessor() { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} //... //創建Node節點 Node(Node nextWaiter) {//<1> this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } }
這裡簡單介紹下Node的欄位:
- prev指向當前節點的前驅節點,next指向當前節點的後繼節點。
- thread欄位在調用<1>處的構造方法時,會將thread指向當前調用執行緒的Thread對象。
- waitStatus(等待狀態)初始值為0,當waitStatus為SIGNAL(-1)時,表示當前節點的後繼節點所指向的執行緒(node.next.thread)陷入阻塞,當前節點如果被移除(CANCELLED)或在佔有鎖後要釋放鎖的時候,需要喚醒後繼節點的執行緒。這裡有多種可能導致當前節點的等待狀態變為移除,比如調用tryLock(long timeout, TimeUnit unit) 超時會獲取到鎖,或者調用lockInterruptibly()後執行緒被中斷。
- nextWaiter可以用來表示一個節點的執行緒到底是獨佔執行緒(EXCLUSIVE)還是共享執行緒(SHARED),獨佔執行緒一般用於可重入互斥鎖(ReentrantLock)或者可重入讀寫鎖(ReentrantReadWriteLock )的寫鎖,而共享執行緒則表示當前執行緒是可以和其他共享執行緒一起共享資源的,一般用於可重入讀寫鎖的讀鎖。
如果對上面Node欄位還有不理解的地方不用心急,筆者在後面還會和大家一起深入了解這幾個欄位。
在簡單了解了Node的數據結構後,我們來看看AQS是如何將一個執行緒封裝成一個Node對象,並將其加入到等待隊列。addWaiter(Node mode)會根據傳入的參數node,決定創建的節點是獨佔節點還是共享節點,先前ReentrantLock傳入的是Node.EXCLUSIVE,所以這裡是獨佔節點,在執行完<1>處的程式碼後,節點創建完畢,節點的thread欄位也保存了當前執行緒對象的引用。之後會進入<2>處的循環,這裡是通過CAS自旋的方式將節點加入到等待隊列,之所以用這種方式是因為可能存在多個執行緒同時要入隊的情況,用CAS自旋保證每個節點的前驅和後繼的有序性。當節點要入隊時,會先獲取尾節點,如果在<3>處判斷尾節點不為null,則將當前節點的前驅指向尾節點,並用CAS的方式設置當前節點為設置為尾節點,如果原先的尾節點(oldTail)的指向沒有被任何執行緒修改,這裡用CAS將當前節點設置成尾節點就會成功,於是原先尾節點的後繼指向當前節點,當前節點入隊成功。但我們也要考慮尾節點為null的情況,即第一個進入等待隊列的節點,此時頭節點(header)和尾節點(tail)都為null,這裡就會執行<4>處的分支,進行隊列初始化。初始化隊列的時候,同樣存在並發問題,所以這裡依舊用CAS初始化頭節點成功,再將頭節點指向的Node對象賦值給尾節點。初始化隊列完畢後,會再開始新的一輪循環,用CAS的方式嘗試將節點入隊,入隊成功後,則返回當前節點。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head;//等待隊列的頭節點 private transient volatile Node tail;//等待隊列的尾節點 //... private Node addWaiter(Node mode) { //為競爭鎖的執行緒創建一個Node對象,並用Node.thread欄位存儲調用執行緒Thread對象 Node node = new Node(mode);//<1> for (;;) {//<2> Node oldTail = tail; if (oldTail != null) {//<3> node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else {//<4> initializeSyncQueue(); } } } private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; } private final boolean compareAndSetTail(Node expect, Node update) { return TAIL.compareAndSet(this, expect, update); } //... }
在執行完addWaiter(Node.EXCLUSIVE)確定節點入隊後,就要將返回節點傳入到方法:acquireQueued(final Node node, int arg)。之前我們說過,搶鎖失敗的節點會進入一個等待隊列,等待鎖的分配,我們已經在addWaiter(Node mode)看到執行緒是如何入隊的,那接下來就要看看執行緒是如何等待鎖的分配。在看acquireQueued(final Node node, int arg)之前,我們先來思考下如果是我們自己會如何設計將鎖分配給執行緒?最簡單的做法是每個執行緒都在一個死循環中去輪詢鎖的狀態,如果發現鎖處於無主狀態並搶鎖成功,執行緒則跳出循環訪問資源。但這個做法有個缺點就是會消耗CPU時間片,尤其對於一些優先順序不高的執行緒,相比於優先順序高的執行緒它們可能永遠無法競爭到鎖,永遠訪問不到資源處於飢餓狀態。那麼有沒有相比死循環更好的做法呢?我們是否可以先把一個入隊的執行緒阻塞起來,先讓它不要消耗寶貴的CPU時間片,當佔據鎖的執行緒完全釋放鎖(state變為0)時,則去喚醒隊列中等待時長最長的執行緒,這樣也不用擔心優先順序低的執行緒無法與優先順序高的執行緒競爭鎖,導致處於飢餓狀態,一舉兩得。
這裡我們還要再加深下對等待隊列Node的理解才能往下看acquireQueued(final Node node, int arg),大家思考下,Node中的thread欄位是用來指向競爭鎖的執行緒對象,通過這個對象,我們可以用釋放鎖的執行緒喚醒等待鎖的執行緒,佔用鎖的執行緒在完全釋放鎖將鎖變為無主狀態後,喚醒等待鎖的執行緒,這個等待鎖的執行緒如果成功佔據了鎖,是否可以將本身執行緒中Node.thread置為null?此刻執行緒已經佔據了鎖,它不會再陷入阻塞,也不需要有其他的執行緒來喚醒自身。所以等待隊列的頭節點的thread(header.thread)欄位永遠為null,因為鎖被頭節點的執行緒所佔用。
當然,也可能出現鎖被佔用但頭節點(header)本身就為null,這種情況一般出現在我們初始化好一個ReentrantLock後,只有一個執行緒佔有了鎖,此時調用tryAcquire(int acquires)會調用ReentrantLock.Sync.nonfairTryAcquire(int acquires)方法,這個方法只會簡單修改state狀態,並不會新增一個頭節點。除非鎖已有執行緒佔據,且出現新的執行緒競爭鎖,這時候新的執行緒在進入等待隊列的時候,會初始化隊列,為本身佔據鎖的執行緒補上一個頭節點,初始化隊列的時候調用的是Node的無參構造方法,所以頭節點的thread欄位為null,表示鎖被當前頭節點原先指向的執行緒所佔據。
在了解這些基本知識後,下面我們終於可以來看看大家迫不及待的acquireQueued(final Node node, int arg)了。當把封裝了當前執行緒的Node對象傳入到acquireQueued(final Node node, int arg)方法時,並不會立即阻塞當前執行緒等待其他執行緒喚醒。這裡會先在<1>處獲取當前節點的前驅節點p,判斷p是不是頭節點,如果p是頭節點,則當前執行緒即有佔有鎖的可能。因為佔據鎖的執行緒會先釋放鎖,再通知隊列中的執行緒搶鎖。所以會存在當前節點入隊前鎖已被釋放的情況,於是判斷前驅節點p是頭節點,會再調用tryAcquire(int acquires)方法搶鎖,如果搶鎖成功,就可以按照我們上面所說的套路,調用setHead(Node node)將當前節點設置為頭節點,設置當前節點的執行緒引用為null,然後返回。
如果當前節點的前驅節點不是頭節點,這裡就要調用shouldParkAfterFailedAcquire(Node pred, Node node)設置前驅節點的等待狀態(waitStatus),先前說過,這個等待狀態可以用來表示下個節點的阻塞狀態。假設有一個鎖已經被其他執行緒佔有,Thread-1、Thread-2要來搶鎖,此時必然是搶鎖失敗的,這裡會把Thread-1、Thread-2分別封裝成Node1和Node2並進行入隊,Node1和Node2初始的等待狀態都為0,假定Node1先Node2入隊,Node1為Node2的前驅節點(即:Node2.prev=Node1),Node1不是頭節點,所以不會去搶鎖,這裡直接進入<2>處分支的shouldParkAfterFailedAcquire(Node pred, Node node)方法,Node1的初始等待狀態為0,所以<3>處和<5>處的分支是進不去的,只能進入<4>處的分支,將Node1的等待狀態設置為SIGNAL,表示Node1的後繼節點處於等待喚醒狀態,然後返回false,於是<2>處的判斷不成立,又開始新的一輪循環,假定頭節點的執行緒依舊沒釋放鎖,Node1依舊不是頭節點,還是直接執行shouldParkAfterFailedAcquire(Node pred, Node node)方法,此時判斷Node2的前驅節點Node1的等待狀態為-1,表示可以阻塞Node1後繼節點Node2所指向的執行緒,所以這裡會返回true,進入<2>處的分支,調用parkAndCheckInterrupt()方法,在這個方法中會調用LockSupport.park(Object blocker)阻塞當前的調用執行緒,直到有其他執行緒調用LockSupport.unpark(Node2.thread)喚醒Node2被阻塞的執行緒,或Node2.thread被中斷才會退出parkAndCheckInterrupt()。我們注意到在<5>處有一個判斷,前驅節點的等待狀態>0,一般狀態為CANCELLED(1),表示前驅節點被移除。之所以會存在被移除的節點,是因為我們可能以tryLock(long timeout, TimeUnit unit)的方式往等待隊列中添加節點,如果超時還未獲得鎖,這個節點就要被移除;我們還可能用lockInterruptibly()的方式往等待隊列中添加節點,如果節點所對應的執行緒被中斷,這個節點也處於被移除狀態。所以<5>處如果發現前驅節點的等待狀態大於0,會一直往前驅節點遍歷直到找到等待狀態<=0的節點將其作為前驅節點,並將前驅節點的後繼指向當前節點。要注意的是,等待狀態為-1時,代表當前節點的後繼節點等待喚醒,>0的時候,代表當前節點被移除,前者的狀態與後繼節點有關,後者的狀態僅與自身有關。如果在自旋期間執行緒出現其他異常,則會調用<6>處的程式碼將節點從等待隊列移除,並拋出異常。cancelAcquire(Node node)會在後面介紹,這裡我們只要先知道這是一個將節點從隊列中移除的方法。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private transient volatile Node head; //... final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor();//<1> if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node))//<2> interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node);//<6> if (interrupted) selfInterrupt(); throw t; } } //... //設置當前節點為頭節點,此時可以清空頭節點指向的執行緒引用 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //... private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)//<3> /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//<5> /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//<4> /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } //... private final boolean parkAndCheckInterrupt() { //阻塞調用執行緒,可調用LockSupport.unpark(Thread thread)喚醒或由執行緒中斷喚醒。 LockSupport.park(this); //返回執行緒是否由中斷喚醒,返回true為被中斷喚醒,但此方法會清除執行緒的中斷標記 return Thread.interrupted(); } //... }
能從boolean acquireQueued(final Node node, int arg)方法中返回的執行緒,都是成功佔有鎖的執行緒,但返回結果分當前執行緒是否被中斷,true為被中斷。可能存在這樣一種情況,前一個執行緒釋放鎖完畢後,即將喚醒後一個執行緒,此時後一個執行緒被中斷喚醒,後一個執行緒發現其Node節點的前驅節點為頭節點,且鎖為無主狀態,於是搶鎖成功直接返回。這裡要標記執行緒的中斷狀態interrupted,因為執行緒會從parkAndCheckInterrupt()中被喚醒,最後會執行Thread.interrupted()返回當前執行緒是否由中斷喚醒,但Thread.interrupted()會清除中斷標記,所以在佔據鎖之後會根據返回的interrupted狀態,決定是否設置執行緒的中斷狀態。如果一個執行緒在調用acquireQueued(final Node node, int arg)方法的後都未被中斷,直到前一個執行緒調用LockSupport.unpark(Thread thread)喚醒該執行緒,那麼這個執行緒就不是用中斷的形式喚醒,也就不用設置執行緒的中斷狀態。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //根據acquireQueued()的返回,決定是否設置執行緒的中斷標記 selfInterrupt(); } //... static void selfInterrupt() { Thread.currentThread().interrupt(); } //... }