從ReentrantLock詳解AQS原理源碼解析
Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基於AbstractQueuedSynchronizer(簡稱為AQS)實現的。AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架。本文旨在從ReentrantLock詳解AQS原理源碼解析。
數據結構
java.util.concurrent.locks.AbstractQueuedSynchronizer類中存在如下數據結構。
// 鏈表結點
static final class Node {}
// head指向的是一個虛擬結點,刷多了算法就知道這樣做的目的是方便對鏈表操作,真正的頭為head.next
private transient volatile Node head;
// 尾結點
private transient volatile Node tail;
// 同步狀態,用於展示當前臨界資源的獲鎖情況。
private volatile int state;
// 繼承至AbstractOwnableSynchronizer類
// 獨佔模式下當前鎖的擁有者
private transient Thread exclusiveOwnerThread;
// 自旋鎖的自旋納秒數,用於提高應用的響應能力
static final long spinForTimeoutThreshold = 1000L;
// unsafe類
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 以下字段對應上面字段的在對象中的偏移值,在靜態代碼塊中初始化,其值是相對於在這個類對象中的偏移量
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
在AQS類中內部類Node包含如下數據結構
static final class Node {
// 共享鎖
static final Node SHARED = new Node();
// 獨佔鎖
static final Node EXCLUSIVE = null;
// 0 當一個Node被初始化的時候的默認值
// CANCELLED 為 1,表示線程獲取鎖的請求已經取消了
// CONDITION 為 -2,表示節點在等待隊列中,節點線程等待喚醒
// PROPAGATE 為 -3,當前線程處在SHARED情況下,該字段才會使用
// SIGNAL 為 -1,表示線程已經準備好了,就等資源釋放了
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 前驅指針
volatile Node prev;
// 後繼指針
volatile Node next;
// 該節點代表的線程對象
volatile Thread thread;
Node nextWaiter;
}
從其數據結構可以猜測出
- AQS類中主要的存儲結構應該是一個雙向鏈表。
- state字段對應了這個鎖對象的狀態。
- 線程申請鎖時會將其包裝成一個節點。Node保存了獲取鎖的線程信息。
- Node.waitStatus字段保存這個線程申請鎖的狀態。
- head指向的是一個虛擬結點,真正有效的頭為head.next
源碼分析
我們從AQS的實現類ReentrantLock#lock開始分析其具體的流程。
ReentrantLock#lock
public void lock() {
sync.lock();
}
直接調用了Sync類的lock()方法,Sync類在ReentrantLock中有兩個實現類分別是FairSync和NonfairSync,分別對應了公平鎖和非公平鎖。
- 公平鎖:線程獲取鎖的順序和調用lock的順序一樣,FIFO;
- 非公平鎖:線程獲取鎖的順序和調用lock的順序無關,全憑運氣。
由於ReentrantLock默認是非公平鎖,我們從NonfairSync類分析。
ReentrantLock.NonfairSync#lock
final void lock() {
// cas操作嘗試將state字段值修改為1
if (compareAndSetState(0, 1))
// 成功的話就代表已經獲取到鎖,修改獨佔模式下當前鎖的擁有者為當前線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 獲取鎖失敗之後的操作
acquire(1);
}
從這可以確定我們之前的猜測
- state字段對應了這個鎖對象的狀態,值為0的時候代表鎖沒有被線程佔用,修改為1之後代表鎖被佔用。
現在分析未獲取到鎖之後的流程
AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (
// 當前線程嘗試獲取鎖
!tryAcquire(arg) &&
// acquireQueued會把傳入的結點在隊列中不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。
acquireQueued(
// 在雙向鏈表的尾部創建一個結點,值為當前線程和傳入的模式
addWaiter(Node.EXCLUSIVE),
arg
)
)
// TODO
selfInterrupt();
}
看不懂,先查找資料了解這幾個方法的作用,注釋在代碼中。
ReentrantLock.NonfairSync#tryAcquire
// 當前線程嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
ReentrantLock.Sync#nonfairTryAcquire
// 當前線程嘗試獲取鎖-非公平
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲得當前鎖對象的狀態
int c = getState();
// state為0代表當前沒有被線程佔用
if (c == 0) {
// cas操作嘗試將state字段值修改為請求的數量
if (compareAndSetState(0, acquires)) {
// 直接修改當前獨佔模式下鎖的擁有者為為當前線程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果鎖的佔有者就是當前線程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// state值增加相應的請求數。
setState(nextc);
return true;
}
return false;
}
ReentrantLock字面意思是可重入鎖
- 可重入鎖:一個線程在獲取一個鎖之後,在沒有釋放之前仍然可以繼續申請鎖而不會造成阻塞,但是解鎖的時候也需要相應次數的解鎖操作。
結合nonfairTryAcquire方法邏輯,可以推斷出state字段在獨佔鎖模式下還代表了鎖的重入次數。
AbstractQueuedSynchronizer#addWaiter
// 在鏈表尾部創建一個結點,值為當前線程和傳入的模式
private Node addWaiter(Node mode) {
// 創建一個結點,值為當前線程和傳入的模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 快速路徑,是為了方便JIT優化。jvm檢測到熱點代碼,會將其編譯成本地機器碼並以各種手段進行代碼優化。
Node pred = tail;
if (pred != null) {
// 將新創建的node的前驅指針指向tail。
node.prev = pred;
// 將結點修改為隊列的tail時可能會發生數據衝突,用cas操作保證線程安全。
if (compareAndSetTail(pred, node)) {
// compareAndSetTail比較的地址,如果相等則將新的地址賦給該字段(而不是在源地址上替換,為什麼我會這麼想???)
// 所以此處pred引用指向的仍然是源tail的內存地址。將其後繼指針指向新的tail
pred.next = node;
return node;
}
}
// 隊列為空或者cas失敗(說明被別的線程已經修改)
enq(node);
return node;
}
這個方法主要作用是在鏈表尾部創建一個結點,返回新創建的結點,其主要流程為
- 通過當前的線程和鎖模式創建一個節點。
- 節點入尾操作
- 新節點的前驅指針指向tail
- 使用cas操作修改新節點為tail
- 原tail的後繼指針指向新節點
當隊列為空或者cas失敗(說明被別的線程已經修改)會執行enq方法兜底。
AbstractQueuedSynchronizer#enq
// 在隊列尾部創建一個結點,值為當前線程和傳入的模式,當隊列為空的時候初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 創建一個空結點設置為頭,真正的頭為hdead.next
if (compareAndSetHead(new Node()))
// 尾等於頭
tail = head;
} else {
// 這段邏輯跟addWaiter()中快速路徑的邏輯一樣。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter是對enq方法的一層封裝,addWaiter首先嘗試一個快速路徑的在鏈表尾部創建一個結點,失敗的時候迴轉入enq方法兜底,循環在鏈表尾部創建一個節點,直到成功為止。
這裡有個疑問,為什麼要在addWaiter方法中嘗試一次在enq方法中能完成的在鏈表尾部創建一個節點的操作呢?其實是為了方便JIT優化。jvm檢測到熱點代碼,會將其編譯成本地機器碼並以各種手段進行代碼優化。了解更多1、了解更多2。
在鏈表尾插入需要
AbstractQueuedSynchronizer#acquireQueued
// acquireQueued會把傳入的結點在隊列中不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。
final boolean acquireQueued(final Node node, int arg) {
// 標記是否成功拿到鎖
boolean failed = true;
try {
// 標記獲取鎖的過程中是否中斷過
boolean interrupted = false;
// 開始自旋,要麼獲取鎖,要麼中斷
for (;;) {
// 獲得其前驅節點
final Node p = node.predecessor();
// 如果前驅節點為head代表現在節點node在隊列有效數據的第一位,就嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,把當前節點置為虛節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果存在以下情況就要判斷當前node是否要被阻塞
// 1. p為頭節點且獲取鎖失敗 2. p不為頭結點
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞進程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消申請鎖
cancelAcquire(node);
}
}
AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
// 依賴前驅節點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 入參請求鎖的node的前驅節點的狀態
int ws = pred.waitStatus;
// 如果前驅節點的狀態為"表示線程已經準備好了,就等資源釋放了"
// 說明前驅節點處於激活狀態,入參node節點需要被阻塞
if (ws == Node.SIGNAL)
return true;
// 只有CANCELLED狀態對應大於0
if (ws > 0) {
do {
// 循環向前查找取消狀態節點,把取消節點從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設置狀態非取消的前驅節點等待狀態為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
ReentrantLock#lock總結
到現在我們可以總結一下ReentrantLock#lock非公平鎖方法的流程
未獲取到鎖的情況下函數調用流程
- ReentrantLock#lock
- ReentrantLock.Sync#lock
- ReentrantLock.NonfairSync#lock
- AbstractQueuedSynchronizer#acquire
- ReentrantLock.NonfairSync#tryAcquire
- ReentrantLock.Sync#nonfairTryAcquire
- AbstractQueuedSynchronizer#addWaiter
- AbstractQueuedSynchronizer#acquireQueued
描述
- 執行ReentrantLock的Lock方法。
- 會調用到內部類Sync的Lock方法,由於Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,cas修改state值獲取鎖,失敗執行父類的Acquire方法。
- 父類的Acquire方法會執行子類實現的tryAcquire方法,因為tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由於ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。
- tryAcquire是獲取鎖邏輯,獲取失敗後,會執行框架AQS的後續邏輯,跟ReentrantLock自定義同步器無關。
// 公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。
// 返回False,當前線程可以爭取共享資源;
// 返回True,隊列中存在有效節點,當前線程必須加入到等待隊列中。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 頭不等於尾代表隊列中存在結點返回true
// 但是還有一種特例,就是如果現在正在執行enq方法進行隊列初始化,tail = head;語句運行之後
// 此時h == t,返回false,但是隊列中
return h != t &&
// 從這可以看出真正的頭結點是head.next,即說明head是一個無實際數據的結點,為了方便鏈表操作
((s = h.next) == null
// 有效頭結點與當前線程不同,返回true必須加入到等待隊列
|| s.thread != Thread.currentThread());
}
即時編譯器
Java程序最初都是通過解釋器進行解釋執行的,當虛擬機發現某個方法或代碼塊的運行特別頻繁,就會把這些代碼認定為「熱點代碼」(Hot Spot Code),為了提高熱點代碼的執行效率,在運行時,虛擬機將會把這些代碼編譯成本地機器碼,並以各種手段儘可能地進行代碼優化,運行時完成這個任務的後端編譯器被稱為即時編譯器。
這裡所說的熱點代碼主要包括兩類
- 被多次調用的方法
- 被多次執行的循環體
對於這兩種情況,編譯的目標對象都是整個方法體,而不會是單獨的循環體