一文帶你學會AQS和並發工具類的關係
1. 存在的意義
AQS(AbstractQueuedSynchronizer)是JAVA中眾多鎖以及並發工具的基礎,其底層採用樂觀鎖,大量使用了CAS操作, 並且在衝突時,採用自旋方式重試,以實現輕量級和高效地獲取鎖。
提供一個框架,用於實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關的同步器(semaphore等)。 此類旨在為大多數依賴單個原子int值表示狀態的同步器提供有用的基礎。 子類必須定義更改此狀態的受保護方法,並定義該狀態對於獲取或釋放此對象而言意味着什麼。 鑒於這些,此類中的其他方法將執行所有排隊和阻塞機制。 子類可以維護其他狀態字段,但是相對於同步,僅跟蹤使用方法getState,setState和compareAndSetState操作的原子更新的int值。
此類支持默認獨佔模式和共享模式之一或兩者。 當以獨佔方式進行獲取時,其他線程嘗試進行的獲取將無法成功。 由多個線程獲取的共享模式可能(但不一定)成功。 該類不「理解」這些差異,當共享模式獲取成功時,下一個等待線程(如果存在)還必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊列。 通常,實現子類僅支持這些模式之一,但例如可以在ReadWriteLock發揮作用。 僅支持獨佔模式或僅支持共享模式的子類無需定義支持未使用模式的方法。
2. 核心知識點
2.1 state
private volatile int state; // 同步狀態
state是整個工具的核心,通常整個工具都是在設置和修改狀態,很多方法的操作都依賴於當前狀態是什麼。由於狀態是全局共享的,一般會被設置成volatile類型,為了保證其修改的可見性;
2.2 CLH隊列
AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。隊列採用的是悲觀鎖的思想,表示當前所等待的資源,狀態或者條件短時間內可能無法滿足。因此,它會將當前線程包裝成某種類型的數據結構,扔到一個等待隊列中,當一定條件滿足後,再從等待隊列中取出。
2.3 CAS操作
CAS操作是最輕量的並發處理,通常我們對於狀態的修改都會用到CAS操作,因為狀態可能被多個線程同時修改,CAS操作保證了同一個時刻,只有一個線程能修改成功,從而保證了線程安全。CAS採用的是樂觀鎖的思想,因此常常伴隨着自旋,如果發現當前無法成功地執行CAS,則不斷重試,直到成功為止。
3. 核心實現原理
3.1 作為同步器的基礎
要將此類用作同步器的基礎,請使用getState,setState或compareAndSetState檢查或修改同步狀態,以重新定義以下方法(如適用):
- tryAcquire
獨佔方式,arg為獲取鎖的次數,嘗試獲取資源,成功則返回True,失敗則返回False。
- tryRelease
獨佔方式,arg為釋放鎖的次數,嘗試釋放資源,成功則返回True,失敗則返回False。
- tryAcquireShared
共享方式,arg為獲取鎖的次數,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared
共享方式,arg為釋放鎖的次數,嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回True,否則返回False。
- isHeldExclusively
該線程是否正在獨佔資源。只有用到Condition才需要去實現它。
默認情況下,這些方法中的每一個都會引發UnsupportedOperationException 。 這些方法的實現必須在內部是線程安全的,並且通常應簡短且不阻塞。 定義這些方法是使用此類的唯一受支持的方法。 所有其他方法都被聲明為final方法,因為它們不能獨立變化。
3.2 同步狀態state
AQS中維護了一個名為state的字段,意為同步狀態,是由volatile修飾的,用於展示當前臨界資源的獲鎖情況。
private volatile int state;
下面提供了幾個訪問這個state字段的方法:
返回同步狀態的當前值。 此操作具有volatile讀取的內存語義
protected final int getState() {
return state;
}
設置同步狀態的值。 此操作具有volatile寫操作的內存語義。
protected final void setState(int newState) {
state = newState;
}
如果當前狀態值等於期望值,則以原子方式將同步狀態設置為給定的更新值。 此操作具有volatile讀寫的內存語義
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這幾個方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態來實現多線程的獨佔模式和共享模式
state的值即表示了鎖的狀態,state為0表示鎖沒有被佔用,state大於0表示當前已經有線程持有該鎖,這裡之所以說大於0而不說等於1是因為可能存在可重入的情況。你可以把state變量當做是當前持有該鎖的線程數量。
public abstract class AbstractOwnableSynchronizer
protected AbstractOwnableSynchronizer() {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
exclusiveOwnerThread 屬性的值即為當前持有鎖的線程獨佔模式獲取鎖流程:
共享模式獲取鎖流程:
3.3 數據結構
- AQS中最基本的數據結構是Node,Node即為CLH變體隊列中的節點。
static final class Node {
// 表示線程以共享的模式等待鎖
static final Node SHARED = new Node();
// 表示線程正在以獨佔的方式等待鎖
static final Node EXCLUSIVE = null;
// 為1,表示線程獲取鎖的請求已經取消了
static final int CANCELLED = 1;
// 為-1,表示線程已經準備好了,就等資源釋放了
static final int SIGNAL = -1;
// 為-2,表示節點在等待隊列中,節點線程等待喚醒
static final int CONDITION = -2;
// 為-3,當前線程處在SHARED情況下,該字段才會使用
static final int PROPAGATE = -3;
// 當前節點在隊列中的狀態
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 後續節點
volatile Node next;
// 當前節點的線程
volatile Thread thread;
// 指向下一個處於CONDITION狀態的節點
Node nextWaiter;
...
}
- AQS中CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。
// 隊列頭節點
private transient volatile Node head;
// 隊列尾節點
private transient volatile Node tail;
在AQS中的隊列是一個FIFO隊列,它的head節點永遠是一個虛擬結點(dummy node), 它不代表任何線程,因此head所指向的Node的thread屬性永遠是null。但是我們不會在構建過程中創建它們,因為如果沒有爭用,這將是浪費時間。 而是構造節點,並在第一次爭用時設置頭和尾指針。只有從次頭節點往後的所有節點才代表了所有等待鎖的線程。也就是說,在當前線程沒有搶到鎖被包裝成Node扔到隊列中時,即使隊列是空的,它也會排在第二個,我們會在它的前面新建一個虛擬節點。
4. 獲取鎖實現
4.1 ReentrantLock 獨佔鎖內部結構
構造函數源代碼
// 默認創建非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
// 通過傳值為true來進行創建公平鎖
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 裏面有三個內部類:
- 一個是抽象的 Sync 實現了 AbstractQueuedSynchronizer
- NonfairSync 繼承了 Sync
- FairSync 繼承了 Sync
4.2 非公平鎖的實現
ReentrantLock 種獲取鎖的方法
public void lock() {
sync.lock();
}
ReentrantLock 的非公平鎖實現
static final class NonfairSync extends Sync {
final void lock() {
// 如果設置state的值從0變為1成功
if (compareAndSetState(0, 1))
// 則將當前線程設置為獨佔線程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
compareAndSetState(0,1)
protected final boolean compareAndSetState(int expect, int update) {
// 通過unsafe.compareAndSwapInt方法來進行設置值
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
stateOffset 為AQS種維護的state屬性的偏移量
setExclusiveOwnerThread(Thread.currentThread());
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
acquire(1); 調用的是AQS 中的acquire(int arg) 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg) 該方法是protected的由子類去具體實現的
我們需要看的是NonfairSync中實現的tryAcquire方法,裏面又調用了nonfairTryAcquire方法,再進去看看
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
nonfairTryAcquire(int acquires) 方法實現
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取當前state的值
int c = getState();
if (c == 0) {
// 看看設置值是否能成功
if (compareAndSetState(0, acquires)) {
// 則將當前線程設置為獨佔線程
setExclusiveOwnerThread(current);
return true;
}
}
// 返回由setExclusiveOwnerThread設置的最後一個線程;如果從不設置,則返回null
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 設置state的值
setState(nextc);
return true;
}
return false;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法實現,先看里addWaiter(Node.EXCLUSIVE)方法注意:Node.EXCLUSIVE 此時是空值,所以mode 就是空的,所以此時創建的Node節點中的nextWaiter是空值。
static final class Node {
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
}
private Node addWaiter(Node mode) {
// 創建一個新的節點
Node node = new Node(Thread.currentThread(), mode);
// 將當前CLH隊列的尾部節點賦予給 pred
Node pred = tail;
if (pred != null) { // 如果尾節點不為空
node.prev = pred; // 將當前node節點的前驅節點指向CLH隊列的尾部節點
if (compareAndSetTail(pred, node)) { // CAS設置值
pred.next = node; // CLH隊列的尾部節點的後繼節點指向新的node節點
return node;
}
}
enq(node);
return node;
}
如果CLH隊列的尾部節點為空值的話,執行enq(node)方法
// 通過CAS方式設置隊列的頭節點
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 通過CAS方式設置隊列的尾部節點
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 節點入隊操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果尾部節點為空值
if (t == null) {
// 則進行初始化一個節點,將其設置為頭節點
if (compareAndSetHead(new Node()))
tail = head; //頭尾指向同一個空節點
} else {
// 如果尾部節點不為空,那麼將傳進來的入隊節點的前驅節點指向當前隊列的尾部節點
node.prev = t;
// 將當前傳進來的入隊節點設置為當前隊列的尾部節點
if (compareAndSetTail(t, node)) {
// 將當前隊列的尾部節點的後繼節點指向傳進來的新節點
t.next = node;
return t;
}
}
}
}
查看 acquireQueued 方法實現
// 獲取當前節點的前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 檢查並更新無法獲取的節點的狀態。 如果線程應阻塞,則返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//SIGNAL這個狀態就有點意思了,它不是表徵當前節點的狀態,而是當前節點的下一個節點 //的狀態。當一個節點的waitStatus被置為SIGNAL,就說明它的下一個節點(即它的後繼 // 節點)已經被掛起了(或者馬上就要被掛起了),因此在當前節點釋放了鎖或者放棄獲取 // 鎖時,如果它的waitStatus屬性為SIGNAL,它還要完成一個額外的操作——喚醒它的後繼節點。
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 當前節點的 ws > 0, 則為 Node.CANCELLED 說明前驅節點 // 已經取消了等待鎖(由於超時或者中斷等原因)
// 既然前驅節點不等了, 那就繼續往前找, 直到找到一個還在等待鎖的節點
// 然後我們跨過這些不等待鎖的節點, 直接排在等待鎖的節點的後面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驅節點的狀態既不是SIGNAL,也不是CANCELLED
// 用CAS設置前驅節點的ws為 Node.SIGNAL,給自己定一個鬧鐘
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 停止的便捷方法,然後檢查是否中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 取消正在進行的獲取嘗試
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
// 能執行到該方法, 說明addWaiter 方法已經成功將包裝了當前Thread的節點添加到了等待隊列的隊尾
// 該方法中將再次嘗試去獲取鎖
// 在再次嘗試獲取鎖失敗後, 判斷是否需要把當前線程掛起
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當前節點的前驅節點
final Node p = node.predecessor();
// 在前驅節點就是head節點的時候,繼續嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 將當前線程掛起,使CPU不再調度它
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
為什麼前面獲取鎖失敗了, 這裡還要再次嘗試獲取鎖呢?
首先, 這裡再次嘗試獲取鎖是基於一定的條件的,即:當前節點的前驅節點就是HEAD節點,因為我們知道,head節點就是個虛擬節點,它不代表任何線程,或者代表了持有鎖的線程,如果當前節點的前驅節點就是head節點,那就說明當前節點已經是排在整個等待隊列最前面的了。
setHead(node); 方法
// 這個方法將head指向傳進來的node,並且將node的thread和prev屬性置為null
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
可以看出,這個方法的本質是丟棄原來的head,將head指向已經獲得了鎖的node。但是接着又將該node的thread屬性置為null了,這某種意義上導致了這個新的head節點又成為了一個虛擬節點,它不代表任何線程。為什麼要這樣做呢,因為在tryAcquire調用成功後,exclusiveOwnerThread屬性就已經記錄了當前獲取鎖的線程了,此處沒有必要再記錄。這某種程度上就是將當前線程從等待隊列裏面拿出來了,是一個變相的出隊操作。
shouldParkAfterFailedAcquire(Node pred, Node node)方法
- 如果為前驅節點的waitStatus值為 Node.SIGNAL 則直接返回 true
- 如果為前驅節點的waitStatus值為 Node.CANCELLED (ws > 0), 則跳過那些節點, 重新尋找正常等待中的前驅節點,然後排在它後面,返回false
- 其他情況, 將前驅節點的狀態改為 Node.SIGNAL, 返回false
acquireQueued方法中的Finally代碼
private void cancelAcquire(Node node) {
// 將無效節點過濾
if (node == null)
return;
// 設置該節點不關聯任何線程,也就是虛節點
node.thread = null;
Node pred = node.prev;
// 通過前驅節點,跳過取消狀態的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取過濾後的前驅節點的後繼節點
Node predNext = pred.next;
// 把當前node的狀態設置為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置為尾節點
// 更新失敗的話,則進入else,如果更新成功,將tail的後繼節點設置為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點不是head的後繼節點,
// 1.判斷當前節點前驅節點的是否為SIGNAL
// 2.如果不是,則把前驅節點設置為SINGAL看是否成功
// 如果1和2中有一個為true,再判斷當前節點的線程是否為null
// 如果上述條件都滿足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果當前節點是head的後繼節點,或者上述條件不滿足,那就喚醒當前節點的後繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
4.3 非公平鎖獲取流程圖
非公平鎖獲取鎖成功的流程圖
非公平鎖獲取鎖失敗的流程圖
5.釋放鎖實現
5.1釋放鎖代碼分析
嘗試釋放此鎖。如果當前線程是此鎖的持有者,則保留計數將減少。 如果保持計數現在為零,則釋放鎖定。 如果當前線程不是此鎖的持有者,則拋出IllegalMonitorStateException。
## ReentrantLock
public void unlock() {
sync.release(1);
}
sync.release(1) 調用的是AbstractQueuedSynchronizer中的release方法
## AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
分析tryRelease(arg)方法
tryRelease(arg)該方法調用的是ReentrantLock中
protected final boolean tryRelease(int releases) {
// 獲取當前鎖持有的線程數量和需要釋放的值進行相減
int c = getState() - releases;
// 如果當前線程不是鎖佔有的線程拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果此時c = 0就意味着state = 0,當前鎖沒有被任意線程佔有
// 將當前所的佔有線程設置為空
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 設置state的值為 0
setState(c);
return free;
}
如果頭節點不為空,並且waitStatus != 0,喚醒後續節點如果存在的話。
這裡的判斷條件為什麼是h != null && h.waitStatus != 0?
因為h == null的話,Head還沒初始化。初始情況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。所以說,這裡如果還沒來得及入隊,就會出現head == null 的情況。
- h != null && waitStatus == 0 表明後繼節點對應的線程仍在運行中,不需要喚醒
- h != null && waitStatus < 0 表明後繼節點可能被阻塞了,需要喚醒
private void unparkSuccessor(Node node) {
// 獲取頭結點waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取當前節點的下一個節點
Node s = node.next;
//如果下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
if (s == null || s.waitStatus > 0) {
s = null;
// 就從尾部節點開始找往前遍歷,找到隊列中第一個waitStatus<0的節點。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果當前節點的下個節點不為空,而且狀態<=0,就把當前節點喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
為什麼要從後往前找第一個非Cancelled的節點呢?
看一下addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我們從這裡可以看到,節點入隊並不是原子操作,也就是說,node.prev = pred, compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往後找了,所以需要從後往前找。還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,因此也是必須要從後往前遍歷才能夠遍歷完全部的Node。
所以,如果是從前往後找,由於極端情況下入隊的非原子操作和CANCELLED節點產生過程中斷開Next指針的操作,可能會導致無法遍歷所有的節點。所以,喚醒對應的線程後,對應的線程就會繼續往下執行。
5.2 釋放鎖流程圖
6.注意
由於篇幅較長公平鎖的實現在下一篇的博客中講述,謝謝大家的關注和支持!有問題希望大家指出,共同進步!!!