ReentrantLock源碼探究
- 2020 年 3 月 17 日
- 筆記
ReentrantLock
是一種可重入鎖,可重入是說同一個線程可以多次獲取同一個鎖,內部會有相應的字段記錄重入次數,它同時也是一把互斥鎖,意味着同時只有一個線程能獲取到可重入鎖。
1.構造函數
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock
提供了兩個構造函數,構造函數只是用來初始化sync
字段,可以看到,默認情況下ReentrantLock
使用的是非公平鎖,當然,也可以使用帶有布爾參數的構造函數來選擇使用公平鎖。公平鎖和非公平鎖的實現依賴於兩個內部類:FairSync
和NonfairSync
,接下來認識一下這兩個類:
//非公平鎖 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
這兩個內部類的代碼都很短,並且都繼承了另一個內部類Sync
。這裡先不急着介紹Sync
類,因為這個類本身也並不複雜,後續在需要用到其中的方法時順帶講解,目前只需要知道這個類繼承了AbstractQueuedSynchronizer(AQS)
即可。
2.常用方法
lock()
public void lock() { sync.lock(); }
lock
方法提供了加鎖的功能,公平鎖和非公平鎖的加鎖操作是不一樣的,先來看看非公平鎖的細節,接着再講解公平鎖。
- 非公平鎖加鎖邏輯
final void lock() { //使用CAS操作,嘗試將state字段從0修改為1,如果成功修改該字段,則表示獲取了互斥鎖 //如果獲取互斥鎖失敗,轉入acquier()方法邏輯 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //設置獲得了互斥鎖的線程 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
對於非公平鎖來講,使用lock()
方法一上來就嘗試獲取互斥鎖,獲取成功就將exclusiveOwnerThread
指向自己,代表當前是自己持有鎖,否則就執行acquire()
方法的邏輯,下面對acquire()
方法的邏輯進行逐個分析。
首先是tryAcquire()
方法,非公平鎖重寫了該方法,並在內部調用Sync
類的nonfairTryAcquire()
。
//從上面的邏輯來看,這裡的acquires=1 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //c==0,說明當前處於未加鎖狀態,鎖沒有被其他線程獲取 if (c == 0) { //在鎖沒有被其他線程佔有的情況下,非公平鎖再次嘗試獲取鎖,獲取成功則將exclusiveOwnerThread指向自己 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //執行到這裡說明鎖已經被佔有,如果是被自己佔有,將state字段加1,記錄重入次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //當nextc達到int類型最大值時會溢出,因此可重入次數的最大值就是int類型的最大值 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //執行到這裡說明:1)鎖未被佔有的情況下,搶鎖失敗,說明當前有其他線程搶到了鎖;2)鎖已經被其他線程佔有 //即只要當前線程沒有獲取到鎖,就返回false return false; } //獲取state字段,該字段定義在AQS中 protected final int getState() { return state; } //設置state字段 protected final void setState(int newState) { state = newState; }
當前線程沒有在tryAcquire()
方法中獲取到鎖時,會先執行addWaiter(Node.EXCLUSIVE)
方法,其中參數Node.EXCLUSIVE
是一個常量,其定義是static final Node EXCLUSIVE = null
,作用是標記鎖的屬性是互斥鎖。addWaiter()
方法的作用是將當前線程包裝成一個Node
節點,放入等待隊列的隊尾,該方法在介紹CountDownLatch
類時詳細講解過,有興趣的朋友可以參考ConcurrentHashMap源碼探究(JDK 1.8),本文不再贅述。
將當前線程加入等待隊列之後,會接着執行acquireQueued()
方法,其源碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋 for (;;) { //獲取當前節點的前一個節點 final Node p = node.predecessor(); //如果前一個節點是頭節點,說明當前節點排在隊首,非公平鎖會則再次通過tryAcquire方法獲取鎖 if (p == head && tryAcquire(arg)) { //將自己設置為頭節點 setHead(node); //前一個頭結點沒用了,會被垃圾回收掉 p.next = null; // help GC failed = false; //正常結束,返回false,注意該字段可能會在下面的條件語句中被改變 return interrupted; } //如果前一個節點不是頭節點,或者當前線程獲取鎖失敗,會執行到這裡 //shouldParkAfterFailedAcquire()方法只有在p的狀態是SIGNAL時才返回false,此時parkAndCheckInterrupt()方法才有機會執行 //注意外層的自旋,for循環體會一直重試,因此只要執行到這裡,總會有機會將p設置成SIGNAL狀態從而將當前線程掛起 //另外,如果parkAndCheckInterrupt()返回true,說明當前線程設置了中斷狀態,會將interrupted設置為true,代碼接着自旋,會在上一個條件語句中返回true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果在自旋中線程被中斷或者發送異常,failed字段的值將會為true,這裡會處理這種情況,放棄讓當前線程獲取鎖,並拋出中斷異常 if (failed) cancelAcquire(node); } } //方法邏輯是:只有在前置節點的狀態是SIGNAL時才返回true,其他情況都返回false private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; //刪除當前節點之前連續狀態是CANCELLED的節點 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //線程在這裡阻塞,並在被喚醒後檢查中斷狀態 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //喚醒後一個節點 unparkSuccessor(node); } node.next = node; // help GC } }
注意acquireQueued()
要麼會拋出中斷異常,要麼正常結束返回false
,只有在線程被喚醒後設置了中斷狀態才會返回true
。對比可以發現,acquireQueued()
方法的邏輯與CountDownLatch
中的doAcquireSharedInterruptibly()
十分類似,許多方法在CountDownLatch
這篇博客中講到過,本文不再對這些方法進行贅述。
介紹完了acquire()
方法,回過頭來看看方法邏輯:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
如果在tryAcquire()
方法中沒有獲取鎖,那麼將當前線程加入到等待隊列隊尾,查看節點的前一個節點是否是頭結點,是的話當前線程可以繼續向下執行,否則就會阻塞掛起。當acquireQueued
返回true時,說明線程設置了中斷狀態,就調用selfInterrupt()
中斷該線程,其他情況selfInterrupt()
方法沒機會執行。
到這裡非公平鎖的加鎖流程已經介紹完了,由於代碼邏輯比較長,且看源碼的過程中會在好幾個類中來回切換,思路很容易斷,閱讀代碼的時候要注意。(有必要補個流程圖)。
- 公平鎖加鎖邏輯
接下來看看公平鎖的加鎖邏輯:
final void lock() { acquire(1); }
與非公平鎖相比,公平鎖沒有一上來就搶鎖的邏輯,這也是公平性的體現。兩種鎖的acquire()
方法的框架相同,但是實現細節不同,來看看公平鎖的tryAcquire()
方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //c=0表示當前沒有其他線程持有鎖 if (c == 0) { //下面的代碼與非公平鎖相比,多了hasQueuedPredecessors()方法的處理邏輯,公平鎖只有在前面沒有其他線程排隊的情況下才會嘗試獲取鎖 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果當前線程已經佔有公平鎖,則記錄重入次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //只要當前線程沒有獲取到鎖,就返回false return false; } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //h != t表示等待隊列中有其他節點 //h.next == null可能會有點費解,按理說h!=t之後,h後面肯定會有節點才對,這種情況其實已經見過,在上文介紹acquireQueued()方法時說過, //被喚醒的第一個等待節點會將自己設置為頭結點,如果這個節點是隊列中的唯一節點的話,它的下一個節點就是null //至於s.thread != Thread.currentThread()這個條件暫時可以忽略,因為公平鎖執行到hasQueuedPredecessors方法時根本還沒有入隊, //這也意味着,只要隊列中有其他節點在等候,公平鎖就要求其他線程排隊等待 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
lockInterruptibly
從名字可以看出,lockInterruptibly
可以響應中斷,來看看該方法的實現:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //先嘗試獲取鎖,獲取失敗才執行後面的邏輯 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
lockInterruptibly()
方法幾乎與acquire()
方法完全一樣,唯一的區別是acquire()
方法中,parkAndCheckInterrupt
因為線程設置了中斷狀態而返回true
時,只是簡單設置了一下interrupted
字段的值,而lockInterruptibly()
則是直接拋出異常。
unlock
方法
介紹完加鎖的邏輯,接下來看看解鎖的邏輯:
public void unlock() { sync.release(1); } public final boolean release(int arg) { //如果成功釋放了鎖,則執行下面的代碼塊 if (tryRelease(arg)) { Node h = head; //如果頭節點不為null,請求節點狀態不是初始狀態,就釋放頭結點後第一個有效節點 //問題:這裡為什麼需要判斷頭結點的狀態呢??? if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // protected final boolean tryRelease(int releases) { int c = getState() - releases; //線程沒有持有鎖的情況下,不允許釋放鎖,否則會拋異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //可重入性的判斷,如果釋放了一次鎖,使得c=0,就指針釋放鎖,做法是將記錄鎖的字段exclusiveOwnerThread重新指向null //注意,只有最後一次釋放可重入鎖,才會返回true if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //喚醒node節點的下一個有效節點,這裡的有效指的是狀態不是CANCELLED狀態的節點 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
newCondition()
ReentrantLock
可以實現綁定多個等待條件,這個功能是在newCondition()
方法中實現的,每次調用newCondition()
方法時,都會產生一個新的ConditionObject
對象,這是AQS
中的一個內部類,代碼很長,這裡就不詳細討論了。來簡單看看該方法的源碼:
public Condition newCondition() { return sync.newCondition(); } final ConditionObject newCondition() { return new ConditionObject(); }
3.總結
在多線程環境中,ReentrantLock
的非公平鎖要比公平鎖擁有更高的性能,因為非公平鎖避免了線程掛起產生的上下文切換的開銷,但是公平鎖能夠避免線程飢餓問題,因此各有各的使用場景。從源碼來看,J.U.C
包下的很多類都依賴AQS
類,因此非常有必要搞懂AQS
。提到ReentrantLock
,總免不了與synchronized
進行對比。synchronized
也是可重入的,並且在JDK 1.6
以後,synchronized
的性能已經得到了很大的提升,因此選擇使用ReentrantLock
一般是考慮使用它的三個優勢:可中斷、可實現公平鎖、可綁定多個條件,這些優勢是synchronized
不具備的。