JAVA並發(3)-ReentrantReadWriteLock的探索
1. 介紹
本文我們繼續探究使用AQS的子類ReentrantReadWriteLock(讀寫鎖)。老規矩,先貼一下類圖
ReentrantReadWriteLock這個類包含讀鎖和寫鎖,這兩種鎖都存在是否公平的概念,這個後面會細講。
此類跟ReentrantLock類似,有以下幾種性質:
- 可選的公平性政策
- 重入,讀鎖和寫鎖同一個線程可以重複獲取。寫鎖可以獲取讀鎖,反之不能
- 鎖的降級,重入還可以通過獲取寫鎖,然後獲取到讀鎖,通過釋放寫鎖的方式,從而寫鎖降級為讀鎖。 然而,從讀鎖升級到寫鎖是不可能的。
- 獲取讀寫鎖期間,支持不可中斷
2. 源碼剖析
先講幾個必要的知識點,然後我們再對寫鎖的獲取與釋放,讀鎖的獲取與釋放進行講解,中間穿插着講公平與非公平的實現。
知識點一:
內部類Sync中,將AQS中的state(private volatile int state;長度是32位)邏輯分成了兩份,高16位代表讀鎖持有的count,低16位代表寫鎖持有的count
Sync
...
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
This lock supports a maximum of 65535 recursive write locks and 65535 read locks. Attempts to exceed these limits result in Error throws from locking methods.(讀鎖與寫鎖都最大支持65535個)
知識點二:
HoldCounter的作用,一個計數器記錄每個線程持有的讀鎖count。使用ThreadLocal維護。緩存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
使用ThreadLocal維護 👇
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
維護最後一個使用HoldCounter的線程。簡言之就是,假如A線程持有讀鎖,A線程重入獲取讀鎖,在它之後沒有其他線程獲取讀鎖,那麼當獲取HoldCounter時,可以直接將cachedHoldCounter賦值給該線程,就不用從ThreadLocal中去查詢了(ThreadLocal內部維持一個 Map ,想獲取當前線程的值就需要去遍歷查詢),這樣做可以節約時間。
private transient HoldCounter cachedHoldCounter;
當前線程持有的重入讀鎖count,當某個線程持有的count降至0,將被刪除。
private transient ThreadLocalHoldCounter readHolds;
初始化在構造函數或readObject中
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
知識點三:
是否互斥 :
讀操作 | 寫操作 | |
---|---|---|
讀操作 | 否 | 是 |
寫操作 | 是 | 是 |
只有讀讀不互斥,其餘都互斥
獲取到了寫鎖,也有資格獲取讀鎖,反之不行.
知識點四
ReentranReadWriteLock,實現了ReadWriteLock接口。
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
ReadWriteLock維護着寫鎖和讀鎖。寫鎖是排他的,而讀鎖可以同時由多個線程持有。與互斥鎖相比,讀寫鎖的粒度更細
有了上面的知識,等會理解下面的源碼就更容易了,故事從下面幾個變量開始~
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
ReentrantReadWriteLock的構造器,默認是非公平模式
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
2.1 寫鎖
寫鎖,排他鎖;一個線程獲取了寫鎖,其他線程只能等待
2.1.1 寫鎖的獲取
writeLock.lock();
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
sync.acquire(1);
}
又來到了AQS的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我們看ReentrantReadWriteLock是如何實現tryAcquire的
protected final boolean tryAcquire(int acquires) {
/*
* 對下面的條件做一個總述:
* 1. 當讀鎖或者寫鎖的count不為零時,同時擁有者不是當前線程,返回false
* 2. 當count達到飽和(超過最大的限制65535),返回false
* 3. 如果上面的情況都不是,該線程有資格去獲取鎖,如果它是可重入獲取鎖或隊列策略允許它。那麼就更新state並且設置鎖的擁有者
*/
// 結合總述看下面的代碼,很容易就看懂了
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 存在讀鎖或寫鎖
if (c != 0) {
// 情況分析: 1. w = 0, 表示已經獲取了讀鎖(不管是自己還是其他線程),直接返回false
// 2. 寫鎖不為零,且當前鎖的擁有者不是當前線程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 不存在讀鎖或寫鎖被佔用的情況
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
上面代碼中的writerShouldBlock方法就是tryAcquire控制公平與否的關鍵,我們分別看看公平與非公平是如何實現的
2.1.1.1 寫鎖的獲取(非公平)
默認情況下是非公平的
static final class NonfairSync extends Sync {
...
// 直接返回false
final boolean writerShouldBlock() {
return false;
}
...
2.1.1.2 寫鎖的獲取(公平)
static final class FairSync extends Sync {
...
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
...
判斷是否該線程前面還有其他線程的結點,上一節有講到過。
這裡還貼一下,整個acquire的流程圖
2.1.2 寫鎖的釋放
下面的這段代碼,記得一定放在finally中
writeLock.unlock();
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
又看到了熟悉的面孔,但我們主要看的還是tryRelease, 👇
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
很簡單,就貼一下release的流程圖
2.2 讀鎖
讀鎖與讀鎖並不互斥,可以存在多個持有讀鎖的線程📕
2.2.1 讀鎖的獲取
readLock.lock();
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
之前的文章還沒有講解過tryAcquireShared在子類如何實現的。看看如何實現的 👇
protected final int tryAcquireShared(int unused) {
/*
* 對下面的條件做一個總述:
* 1. 如果寫鎖被其他線程持有,失敗
* 2. 否則,如果此線程有資格去獲取寫鎖,首先查詢是否需要阻塞(readerShouldBlock).如果不需要,就通過CAS更新state。
*
* 3. 如果上面兩步都失敗了,可能是當前線程沒有資格(需要被阻塞),或CAS失敗,或者count數量飽和了,然後執行fullTryAcquireShared進行重試
*/
Thread current = Thread.currentThread();
int c = getState();
// 這裡可以看出,若該線程持有寫鎖,同樣也可以去獲取讀鎖
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// readerShouldBlock表示此線程是否被阻塞(後面細談)
// 多個線程執行compareAndSetState(c, c + SHARED_UNIT), 只有一個線程可以執行成功,其餘線程去執行fullTryAcquireShared
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// firstReader是第一個獲取到讀鎖的線程
// firstReaderHoldCount是firstReader持有的count
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入
firstReaderHoldCount++;
} else {
// sync queue還存在其他線程持有讀鎖,且該線程不是第一個持有讀鎖的線程
// 結合上面的知識點二,理解下面的邏輯
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 到達這裡的線程,1. 沒有資格獲取(需要阻塞) 2. CAS失敗 3. 讀鎖count飽和
return fullTryAcquireShared(current);
}
Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
final int fullTryAcquireShared(Thread current) {
// 主要邏輯跟上面的tryAcquireShared類似。tryAcquireShared的邏輯只有一個線程會成功CAS,
// 其餘的線程都進入fullTryAcquireShared,進行重試(代碼不那麼複雜)
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 需要被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 此時, 如果該線程前面已經有線程獲取了讀鎖,且當前線程持有的讀鎖count為0,從readHolds除去。
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 這裡表示該線程還是一個new reader, 還沒有持有讀鎖
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 這裡只有一個線程會成功,若CAS失敗,一直重試直到成功,或者讀鎖count飽和,或者需要被阻塞為止
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
我們上面看到了readerShouldBlock,這個方法是控制讀鎖獲取公平與否,下面我們分別看看在非公平與公平模式下的實現 👇
2.2.1.1 讀鎖的獲取(非公平)
static final class NonfairSync extends Sync {
...
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
...
}
判斷sync queue的head的後繼結點是否是寫鎖(獨佔模式)
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
上面的方法是,獲取讀鎖時,避免導致寫鎖飢餓(indefinite writer starvation)的一個措施,下面我們對它進行詳細的解釋
結合上面的圖片,我們設想有一個這樣的情況,寫鎖沒有被獲取,線程A獲取到了讀鎖,此時另一個線程X想要獲取寫鎖,但是寫鎖與讀鎖互斥,所以此時將線程X代表的node添加到sync queue中,等待讀鎖被釋放,才有資格去獲取寫鎖。
上面的情況 + 不存在判斷sync queue的head的後繼結點是否是寫鎖(apparentlyFirstQueuedIsExclusive)的方法時,我們看看會出什麼問題
時刻一: 線程B、線程C,線程D是新建線程想要去獲取讀鎖(new reader),此時因為不存在寫鎖被獲取,所以線程B、線程C,線程D都會在fullTryAcquireShared中不斷重試,最終都獲得讀鎖
時刻二: 線程A釋放,會執行unparkSuccessor,此時線程X被喚醒,但是執行到tryAcquire,又檢測到讀鎖被持有(不管是自己還是是其他線程),線程X又被阻塞。線程B釋放,還是會出現這種情況,只有等到最後一個讀鎖被釋放,線程X才能獲取到寫鎖。但是想想如果後面一連串的讀鎖,線程X不是早就被‘餓死了’
apparentlyFirstQueuedIsExclusive,就可以防止這種‘寫鎖飢餓’的情況發生。線程B、線程C,線程D只有被阻塞,等待線程X獲取到寫鎖,才有機會獲取讀鎖。
2.2.1.2 讀鎖的獲取(公平)
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
...
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
...
}
這裡關於讀鎖的獲取(公平與非公平)分析完了,貼一張整個acquireShared的流程圖
2.2.2 讀鎖的釋放
readLock.unlock();
public void unlock() {
sync.releaseShared(1);
}
AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 喚醒head後驅結點
doReleaseShared();
return true;
}
return false;
}
// unused = 1
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 獲取讀鎖時,會執行 compareAndSetState(c, c + SHARED_UNIT),這裡就是減SHARED_UNIT
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 釋放讀鎖,對需要讀鎖的線程沒有影響(讀讀不互斥)
// 但是,這裡如果返回true,可以喚醒被阻塞的想要持有寫鎖的線程
return nextc == 0;
}
}
3. 總結
- ReentrantReadWriteLock中的防止’寫鎖飢餓’的操作,值得一看
- 將AQS中的state(private volatile int state;),邏輯分為高16位(代表讀鎖的state),低16位(代表寫鎖的state),是一個值得學習的辦法
- 使用ThreadLocal維護每一個現成的讀鎖的重入數,使用cachedHoldCounter維護最後一個使用HoldCounter的讀鎖線程,節省在ThreadLocal中的查詢
使用讀寫鎖的情況,應該取決於與修改相比,讀取數據的頻率,讀取和寫入操作持續的時間。
例如:
- 某個集合不經常修改,但是對其元素搜索很頻繁,使用讀寫鎖就是最佳選擇。(簡言之,就是讀多寫少)
- 現在也是讀多寫少,但是讀操作時間很短,只有一小段代碼,而讀寫鎖比互斥鎖更加複雜,開銷可能大於互斥鎖,這種情況使用讀寫鎖可能不合適。此時就要通過性能分析,判斷使用讀寫鎖在系統中是否合適。