AQS 自定義同步鎖,挺難的!
AQS
是AbstractQueuedSynchronizer
的簡稱。
AbstractQueuedSynchronizer 同步狀態
AbstractQueuedSynchronizer
內部有一個state
屬性,用於指示同步的狀態:
private volatile int state;
state
的字段是個int
型的,它的值在AbstractQueuedSynchronizer
中是沒有具體的定義的,只有子類繼承AbstractQueuedSynchronizer
那麼state
才有意義,如在ReentrantLock
中,state=0
表示資源未被鎖住,而state>=1
的時候,表示此資源已經被另外一個線程鎖住。
AbstractQueuedSynchronizer
中雖然沒有具體獲取、修改state
的值,但是它為子類提供一些操作state
的模板方法:
獲取狀態
protected final int getState() {
return state;
}
更新狀態
protected final void setState(int newState) {
state = newState;
}
CAS更新狀態
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 等待隊列
AQS 等待列隊是一個雙向隊列,隊列中的成員都有一個prev
和next
成員,分別指向它前面的節點和後面的節點。
隊列節點
在AbstractQueuedSynchronizer
內部,等待隊列節點由內部靜態類Node
表示:
static final class Node {
...
}
節點模式
隊列中的節點有兩種模式:
- 獨佔節點:同一時刻只能有一個線程訪問資源,如
ReentrantLock
- 共享節點:同一時刻允許多個線程訪問資源,如
Semaphore
節點的狀態
等待隊列中的節點有五種狀態:
- CANCELLED:此節點對應的線程,已經被取消
- SIGNAL:此節點的下一個節點需要一個喚醒信號
- CONDITION:當前節點正在條件等待
- PROPAGATE:共享模式下會傳播喚醒信號,就是說當一個線程使用共享模式訪問資源時,如果成功訪問到資源,就會繼續喚醒等待隊列中的線程。
自定義同步鎖
為了便於理解,使用AQS自己實現一個簡單的同步鎖,感受一下使用AQS實現同步鎖是多麼的輕鬆。
下面的代碼自定了一個CustomLock
類,繼承了AbstractQueuedSynchronizer
,並且還實現了Lock
接口。
CustomLock
類是一個簡單的可重入鎖,類中只需要重寫AbstractQueuedSynchronizer
中的tryAcquire
與tryRelease
方法,然後在修改少量的調用就可以實現一個最基本的同步鎖。
public class CustomLock extends AbstractQueuedSynchronizer implements Lock {
@Override
protected boolean tryAcquire(int arg) {
int state = getState();
if(state == 0){
if( compareAndSetState(state, arg)){
setExclusiveOwnerThread(Thread.currentThread());
System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了鎖");
return true;
}
}else if(getExclusiveOwnerThread() == Thread.currentThread()){
int nextState = state + arg;
setState(nextState);
System.out.println("Thread: " + Thread.currentThread().getName() + "重入");
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
int state = getState() - arg;
if(getExclusiveOwnerThread() != Thread.currentThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
if(state == 0){
free = true;
setExclusiveOwnerThread(null);
System.out.println("Thread: " + Thread.currentThread().getName() + "釋放了鎖");
}
setState(state);
return free;
}
@Override
public void lock() {
acquire(1);
}
@Override
public void unlock() {
release(1);
}
...
}
CustomLock
是實現了Lock
接口,所以要重寫lock
和unlock
方法,不過方法的代碼很少只需要調用AQS中的acquire
和release
。
然後為了演示AQS的功能寫了一個小演示程序,啟動兩根線程,分別命名為線程A
和線程B
,然後同時啟動,調用runInLock
方法,模擬兩條線程同時訪問資源的場景:
public class CustomLockSample {
public static void main(String[] args) throws InterruptedException {
Lock lock = new CustomLock();
new Thread(()->runInLock(lock), "線程A").start();
new Thread(()->runInLock(lock), "線程B").start();
}
private static void runInLock(Lock lock){
try {
lock.lock();
System.out.println("Hello: " + Thread.currentThread().getName());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
訪問資源(acquire)
在CustomLock的lock方法中,調用了 acquire(1)
,acquire
的代碼如下 :
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- CustomLock.tryAcquire(…):
CustomLock.tryAcquire
判斷當前線程是否能夠訪問同步資源 - addWaiter(…):將當前線程添加到等待隊列的隊尾,當前節點為獨佔模型(Node.EXCLUSIVE)
- acquireQueued(…):如果當前線程能夠訪問資源,那麼就會放行,如果不能那當前線程就需要阻塞。
- selfInterrupt:設置線程的中斷標記
注意: 在acquire方法中,如果tryAcquire(arg)返回true, 就直接執行完了,線程被放行了。所以的後面的方法調用acquireQueued、addWaiter都是tryAcquire(arg)返回false時才會被調用。
tryAcquire 的作用
tryAcquire
在AQS類中是一個直接拋出異常的實現:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
而在我們自定義的 CustomLock 中,重寫了此方法:
@Override
protected boolean tryAcquire(int arg) {
int state = getState();
if(state == 0){
if( compareAndSetState(state, arg)){
setExclusiveOwnerThread(Thread.currentThread());
System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了鎖");
return true;
}
}else if(getExclusiveOwnerThread() == Thread.currentThread()){
int nextState = state + arg;
setState(nextState);
System.out.println("Thread: " + Thread.currentThread().getName() + "重入");
return true;
}
return false;
}
tryAcquire
方法返回一個布而值,true
表示當前線程能夠訪問資源,false
當前線程不能訪問資源,所以tryAcquire
的作用:決定線程是否能夠訪問受保護的資源。tryAcquire
裏面的邏輯在子類可以自由發揮,AQS不關心這些,只需要知道能不能訪問受保護的資源,然後來決定線程是放行還是進行等待隊列(阻塞)。
因為是在多線程環境下執行,所以不同的線程執行tryAcquire
時會返回不同的值,假設線程A比線程B要快一步,先到達compareAndSetState
設置state的值成員並成功,那線程A就會返回true,而 B 由於state的值不為0或者compareAndSetState
執行失敗,而返回false。
線程B 搶佔鎖流程
上面訪問到線程A成功獲得了鎖,那線程B就會搶佔失敗,接着執行後面的方法。
線程的入隊
線程的入隊是邏輯是在addWaiter
方法中,addWaiter方法的具體邏輯也不需要說太多,如果你知道鏈表
的話,就非常容易理解了,最終的結果就是將新線程添加到隊尾。AQS的中有兩個屬性head
、tail
分別指定等待隊列的隊首和隊尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
需要注意的是在enq
方法中,初始化隊列的時候,會新建一個Node
做為head
和tail
,然後在之後的循環中將參數node
添加到隊尾,隊列初始化完後,裏面會有兩個節點,一個是空的結點new Node()
另外一個就是對應當前線程的結點。
由於線程A在tryAcquire
時返回了true
,所以它會被直接放行,那麼只有B線程會進入addWaiter
方法,此時的等待隊列如下:
注意: 等待隊列內的節點都是正在等待資源的線程,如果一個線程直接能夠訪問資源,那它壓根就不需要進入等待隊列,會被放行。
線程B 的阻塞
線程B被添加到等待隊列的尾部後,會繼續執行acquireQueued
方法,這個方法就是AQS阻塞線程的地方,acquireQueued
方法代碼的一些解釋:
- 外面是一個
for (;;)
無限循環,這個很重要 - 會重新調用一次
tryAcquire(arg)
判斷線程是否能夠訪問資源了 node.predecessor()
獲取參數node
的前一個節點shouldParkAfterFailedAcquire
判斷當前線程獲取鎖失敗後,需不需要阻塞parkAndCheckInterrupt()
使用LockSupport
阻塞當前線程,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire 判斷是否要阻塞
shouldParkAfterFailedAcquire
接收兩個參數:前一個節點、當前節點,它會判斷前一個節點的waitStatus
屬性,如果前一個節點的waitStatus=Node.SIGNAL
就會返回true:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
acquireQueued
方法在循環中會多次調用shouldParkAfterFailedAcquire
,在等待隊列中節點的waitStatus
的屬性默認為0,所以第一次執行shouldParkAfterFailedAcquire
會執行:
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
更新完pred.waitStatus
後,節點的狀態如下:
然後shouldParkAfterFailedAcquire
返回false,回到acquireQueued
的循環體中,又去搶鎖還是失敗了,又會執行shouldParkAfterFailedAcquire
,第二次循環時此時的pred.waitStatus
等於Node.SIGNAL
那麼就會返回true。
parkAndCheckInterrupt 阻塞線程
這個方法就比較直觀了, 就是將線程的阻塞住:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
為什麼是一個for (;;)
無限循環呢
先看一個for (;;)
的退出條件,只有node
的前一個節點是head
並且tryAcquire返回true時才會退出循環,否則的話線程就會被parkAndCheckInterrupt
阻塞。
線程被parkAndCheckInterrupt
阻塞後就不會向下面執行了,但是等到它被喚醒後,它還在for (;;)
體中,然後又會繼續先去搶佔鎖,然後如果還是失敗,那又會處於等待狀態,所以一直循環下去,就只有兩個結果:
- 搶到鎖退出循環
- 搶佔鎖失敗,等待下一次喚醒再次搶佔鎖
線程 A 釋放鎖
線程A的業務代碼執行完成後,會調用CustomLock.unlock
方法,釋放鎖。unlock方法內部調用的release(1)
:
public void unlock() {
release(1);
}
release
是AQS類的方法,它跟acquire
相反是釋放的意思:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
方法體中的tryRelease
是不是有點眼熟,沒錯,它也是在實現CustomLock
類時重寫的方法,首先在tryRelease
中會判斷當前線程是不是已經獲得了鎖,如果沒有就直接拋出異常,否則的話計算state的值,如果state為0的話就可以釋放鎖了。
protected boolean tryRelease(int arg) {
int state = getState() - arg;
if(getExclusiveOwnerThread() != Thread.currentThread()){
throw new IllegalMonitorStateException();
}
boolean free = false;
if(state == 0){
free = true;
setExclusiveOwnerThread(null);
System.out.println("Thread: " + Thread.currentThread().getName() + "釋放了鎖");
}
setState(state);
return free;
}
release
方法只做了兩件事:
- 調用
tryRelease
判斷當前線程釋放鎖是否成功 - 如果當前線程鎖釋放鎖成功,喚醒其他線程(也就是正在等待中的B線程)
tryRelease
返回true後,會執行if裏面的代碼塊:
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
先回顧一下現在的等待隊列的樣子:
根據上面的圖,來走下流程:
- 首先拿到
head
屬性的對象,也就是隊列的第一個對象 - 判斷
head
不等於空,並且waitStatus!=0,很明顯現在的waitStatus是等於Node. SIGNAL
的,它的值是-1
所以if (h != null && h.waitStatus != 0)
這個if肯定是滿足條件的,接着執行unparkSuccessor(h)
:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
...
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor
首先將node.waitStatus
設置為0,然後獲取node的下一個節點,最後調用LockSupport.unpark(s.thread)
喚醒線程,至此我們的B線程就被喚醒了。
此時的隊列又回到了,線程B剛剛入隊的樣子:
線程B 喚醒之後
線程A釋放鎖後,會喚醒線程B,回到線程B的阻塞點,acquireQueued
的for循環中:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
線程喚醒後的第一件事就是,拿到它的上一個節點(當前是head結點),然後使用if判斷
if (p == head && tryAcquire(arg))
根據現在等待隊列中的節點狀態,p == head
是返回true的,然後就是tryAcquire(arg)
了,由於線程A已經釋放了鎖,那現在的線程B自然就能獲取到鎖了,所以tryAcquire(arg)也會返回true。
設置隊列頭
線路B拿到鎖後,會調用setHead(node)
自己設置為隊列的頭:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
調用setHead(node)
後隊列會發生些變化 :
移除上一個節點
setHead(node)
執行完後,接着按上一個節點完全移除:
p.next = null;
此時的隊列:
線程B 釋放鎖
線程B 釋放鎖的流程與線程A基本一致,只是當前隊列中已經沒有需要喚醒的線程,所以不需要執行代碼去喚醒其他線程:
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
h != null && h.waitStatus != 0
這裡的h.waitStatus
已經是0了,不滿足條件,不會去喚醒其他線程。
總結
文中通過自定義一個CustomLock
類,然後通過查看AQS源碼來學習AQS的部分原理。通過完整的走完鎖的獲取、釋放兩個流程,加深對AQS的理解,希望對大家有所幫助。
歡迎關注我的公眾號:架構文摘,獲得獨家整理120G的免費學習資源助力你的架構師學習之路!
公眾號後台回復
arch028
獲取資料: