Java並發包源碼學習系列:ReentrantLock可重入獨佔鎖詳解
系列傳送門:
- Java並發包源碼學習系列:AbstractQueuedSynchronizer
- Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
- Java並發包源碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別
基本用法介紹
ReentrantLock
位於java.util.concurrent(J.U.C)
包下,是Lock接口的實現類。基本用法與synchronized
相似,都具備可重入互斥的特性,但擁有更強大的且靈活的鎖機制。本篇主要從源碼角度解析ReentrantLock,一些基本的概念以及Lock接口可以戳這篇:Java並發讀書筆記:Lock與ReentrantLock
ReentrantLock推薦用法如下:
class X {
//定義鎖對象
private final ReentrantLock lock = new ReentrantLock();
// ...
//定義需要保證線程安全的方法
public void m() {
//加鎖
lock.lock();
try{
// 保證線程安全的代碼
}
// 使用finally塊保證釋放鎖
finally {
lock.unlock()
}
}
}
繼承體系
- 實現Lock接口,提供了鎖的關鍵方法,如lock、unlock、tryLock等等,以及newCondition給lock關聯條件對象的方法。
- 內部維護了一個Sync,它繼承AQS,實現AQS提供的獨佔式的獲取與釋放同步資源的方法,提供了可重入的具體實現。
- Sync有兩個實現類,是公平鎖和非公平鎖的兩種實現,FairSync與NonfairSync。
獨佔鎖表示:同時只能有一個線程可以獲取該鎖,其他獲取該鎖的線程會被阻塞而被放入該所的AQS阻塞隊列裏面。這部分可以查看:Java並發包源碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別
構造方法
Sync直接繼承自AQS,NonfairSync和FairSync繼承了Sync,實現了獲取鎖的公平與非公平策略。
ReentrantLock中的操作都是委託給Sync對象來實際操作的。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
默認是使用非公平鎖:NonfairSync
,可以傳入參數來指定是否使用公平鎖。
// 默認使用的是 非公平的策略
public ReentrantLock() {
sync = new NonfairSync();
}
// 通過fair參數指定 策略
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
state狀態表示
在ReentrantLock中,AQS的state狀態值表示線程獲取該鎖的可重入次數,在默認情況下:
- state值為0時表示當前鎖沒有被任何線程持有。
- 當第一個線程第一次獲取該鎖時會嘗試使用CAS設置state的值為1,如果CAS成功則當前線程獲取了該鎖,然後記錄該鎖的持有者為當前線程。
- 在該線程沒有釋放鎖的情況下第二次獲取該鎖後,狀態值設置為2,為可重入次數。
- 在該線程釋放鎖時,會嘗試使用CAS讓state值減1,如果減1後狀態值為0,則當前線程釋放該鎖。
獲取鎖
void lock()方法
ReentrantLock的lock()方法委託給了sync類,根據創建sync的具體實現決定具體的邏輯:
NonfairSync
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// CAS 設置獲取state值
if (compareAndSetState(0, 1))
// 將當前線程設置為鎖的持有者
setExclusiveOwnerThread(Thread.currentThread());
else
// 設置失敗, 調用AQS的acquire方法
acquire(1);
}
state值的初始狀態為0,也就是說,第一個線程的CAS操作會成功將0設置為1,表示當前線程獲取到了鎖,然後通過setExclusiveOwnerThread
方法將當前線程設置為鎖的持有者。
如果這時,其他線程也試圖獲取該鎖,則CAS失敗,走到acquire的邏輯。
// AQS#acquire
public final void acquire(int arg) {
// 調用ReentrantLock重寫的tryAcquire方法
if (!tryAcquire(arg) &&
// tryAcquire方法返回false,則把當前線程放入AQS阻塞隊列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
欸,這個時候我們應該就有感覺了,我們之前在分析AQS的核心方法的時候說到過,AQS是基於模板模式設計的,下面的tryAcquire方法就是留給子類實現的,而NonfairSync中是這樣實現的:
//NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
// 調用
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取當前狀態值
int c = getState();
// 如果當前狀態值為0,如果為0表示當前鎖空閑
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 看看當前的線程是不是鎖的持有者
else if (current == getExclusiveOwnerThread()) {
// 如果是的話 將狀態設置為 c + acquires
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
還是很好理解的哈,先看看鎖的狀態值是啥?
- 如果是0,就CAS嘗試獲取鎖,將狀態從0變到1,並且設置鎖的持有者為當前線程,和之前的邏輯一樣啦。
- 如果不是0,表示已經被某個線程持有啦,看看持有鎖的人是誰呢?如果是自己,那麼好辦,重入唄,將state變為nextc【原先state + 傳入的acquires】,返回true。這裡要注意:nextc<0表示可重入次數溢出。
- 鎖已經被別人霸佔了,那就返回false咯,等待後續
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,被置入AQS阻塞隊列中。
這裡非公平體現在獲取鎖的時候,沒有查看當前AQS隊列中是否有比自己更早請求該鎖的線程存在,而是採取了搶奪策略。
FairSync
公平鎖的tryAcquire實現如下:
//FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 狀態值為0的時候,我去看看隊列裏面在我之前有沒有線程在等
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 當前鎖已經被當前線程持有
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
對比一下兩種策略,不必說,hasQueuedPredecessors
方法一定是實現公平性的核心,我們來瞅瞅:
// 如果當前線程有前驅節點就返回true。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
該方法:如果當前線程有前驅節點就返回true,那麼我們想,不是前驅節點的情況有哪些呢?
- 隊列為空
- 隊列不為空,但當前線程節點是AQS的第一個節點。
知道這些之後,我們就明白最後那串表達式是什麼意思了:隊列裏面的第一個元素不是當前線程,返回true,說明在你之前還有人排着隊呢,你先別搶,先到先得。
公平與非公平策略的差異
我們稍微總結一下:
Reentrant類的構造函數接受一個可選的公平性參數fair。這時候就出現兩種選擇:
- 公平的(fair == true):保證等待時間最長的線程優先獲取鎖,其實就是先入隊的先得鎖,即FIFO。
- 非公平的(fair == false):此鎖不保證任何特定的訪問順序。
公平鎖往往體現出的總體吞吐量比非公平鎖要低,也就是更慢,因為每次都需要看看隊列裏面有沒有在排隊的嘛。鎖的公平性並不保證線程調度的公平性,但公平鎖能夠減少”飢餓”發生的概率。
需要注意的是:不定時的tryLock()方法不支持公平性設置。如果鎖可用,即使其他線程等待時間比它長,它也會成功獲得鎖。
void lockInterruptibly()
該方法與lock方法類似,不同點在於,它能對中斷進行相應:當前線程在調用該方法時,如果其他線程調用了當前線程的interrupt()方法,當前線程會拋出InterruptedException異常,然後返回。
// ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// AQS#acquireInterruptibly
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果當前線程被中斷,則直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取資源
if (!tryAcquire(arg))
// 調用AQS可被中斷的方法
doAcquireInterruptibly(arg);
}
boolean tryLock()方法
嘗試獲取鎖,如果當前該鎖沒有被其他線程持有,則當前線程獲取該鎖並返回true,否則返回false。
大致邏輯和非公平鎖lock方法類似,但該方法會直接返回獲取鎖的結果,無論true或者false,它不會阻塞。
// ReentrantLock# tryLock
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
tryLock()
實現方法,在實現時,希望能快速的獲得是否能夠獲得到鎖,因此即使在設置為fair = true
( 使用公平鎖 ),依然調用Sync#nonfairTryAcquire(int acquires)
方法。- 如果真的希望
tryLock()
還是按照是否公平鎖的方式來,可以調用#tryLock(0, TimeUnit)
方法來實現。
boolean tryLock(long timeout, TimeUnit unit)
// ReentrantLock# tryLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// AQS#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
嘗試獲取鎖,如果獲取失敗會將當前線程掛起指定時間,時間到了之後當前線程被激活,如果還是沒有獲取到鎖,就返回false。
另外,該方法會對中斷進行的響應,如果其他線程調用了當前線程的interrupt()方法,響應中斷,拋出異常。
釋放鎖
void unlock()方法
// ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
//AQS# release
public final boolean release(int arg) {
// 子類實現tryRelease
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// Sync#tryRelease
protected final boolean tryRelease(int releases) {
// 計算解鎖後的次數,默認減1
int c = getState() - releases;
// 如果想要解鎖的人不是當前的鎖持有者,直接拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 可重入次數為0,清空鎖持有線程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 可重入次數還沒到0,只需要改變一下下state就可
setState(c);
return free;
}
}
嘗試釋放鎖,如果當前線程持有該鎖,調用該方法默認會讓AQS的state減1。
如果減1之後,state為0,當前線程會釋放鎖。
如果當前線程不是鎖持有者而企圖調用該方法,則拋出IllegalMonitorStateException異常。
Condition實現生產者消費者
Condition是用來代替傳統Object中的wait()和notify()實現線程間的協作,Condition的await()和signal()用於處理線程間協作更加安全與高效。
Condition的使用必須在lock()與unlock()之間使用,且只能通過lock.newCondition()獲取,實現原理我們之後會專門進行學習。
public class BlockingQueue {
final Object[] items; // 緩衝數組
final ReentrantLock lock = new ReentrantLock(); // 非公平獨佔鎖
final Condition notFull = lock.newCondition(); // 未滿條件
final Condition notEmpty = lock.newCondition(); // 未空條件
private int putIdx; // 添加操作的指針
private int takeIdx; // 獲取操作的指針
private int count; // 隊列中元素個數
public BlockingQueue(int capacity) {
if(capacity < 0) throw new IllegalArgumentException();
items = new Object[capacity];
}
// 插入
public void put(Object item) throws InterruptedException {
try {
lock.lock(); // 上鎖
while (items.length == count) { // 滿了
notFull.await(); // 其他插入線程阻塞起來
}
enqueue(item); // 沒滿就可以入隊
} finally {
lock.unlock(); // 不要忘記解鎖
}
}
private void enqueue(Object item) {
items[putIdx] = item;
if (++putIdx == items.length) putIdx = 0;
count++;
notEmpty.signal(); // 叫醒獲取的線程
}
// 獲取
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();// 阻塞其他獲取線程
}
return dequeue();
} finally {
lock.unlock();
}
}
private Object dequeue() {
Object x = items[takeIdx];
items[takeIdx] = null;
if (++takeIdx == items.length) takeIdx = 0;
count--;
notFull.signal(); // 叫醒其他的插入線程
return x;
}
}
其實上面就是ArrayBlockingQueue刪減版的部分實現,感興趣的小夥伴可以看看源碼的實現,源碼上面針對並發還做了更細節的處理。
總結
API層面的獨佔鎖:ReentrantLock是底層使用AQS實現的可重入的獨佔鎖,區別於synchronized原生語法層面實現鎖語義,ReetrantLock通過lock()
和unlock()
兩個方法顯式地實現互斥鎖。
state與可重入:AQS的state為0表示當前鎖空閑,大於0表示該鎖已經被佔用,某一時刻只有一個線程可以獲取該鎖。可重入性是通過判斷持鎖線程是不是當前線程,如果是,state+1,釋放鎖時,state-1,為0時表示徹底釋放。
公平與非公平策略:ReentrantLock擁有公平和非公平兩種策略,區別在於獲取鎖的時候是否會去檢查阻塞隊列中,是否存在當前線程的前驅節點,默認是非公平鎖策略。
豐富的鎖擴展:提供了響應中斷的獲取鎖方式lockInterruptibly,以及提供了快速響應的tryLock方法,及超時獲取等等方法。
condition:TODO一個ReentrantLock對象可以通過newCondition()同時綁定多個Condition對象,對線程的等待、喚醒操作更加詳細和靈活,這一點我們之後說到Condition的時候會再回過頭說的。
參考閱讀
- 《Java並發編程之美》
- 《Java並發編程的藝術》方騰飛
- Java並發讀書筆記:Lock與ReentrantLock
- 【鎖】Condition接口分析