AQS (AbstractQueuedSynchronizer)源碼導讀:鎖的獲得與釋放
- 2020 年 2 月 18 日
- 筆記
AQS是什麼?
AbstractQueuedSynchronizer簡稱AQS是一個抽象同步框架,可以用來實現一個依賴狀態的同步器。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. 提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(訊號量,事件等)。該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。 This class supports either or both a default exclusive mode and a shared mode. 此類支援默認獨佔模式和共享模式。 Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:
即使這個類基於內部FIFO隊列,它也不會自動執行FIFO採集策略。 排他同步的核心形式如下:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread; (共享模式類似,但可能包含級聯訊號。)
AQS 結構
/** * 當前持有獨佔鎖的執行緒 */ private transient Thread exclusiveOwnerThread; /** * 等待隊列的頭結點,一般為當前持有鎖的執行緒 (volatile) */ private transient volatile Node head; /** * 等待隊列的尾結點,每次新結點進來,都是加到尾部,形成了鏈表 (volatile) */ private transient volatile Node tail; /** * 鎖的狀態,0 表示沒有被佔用,具體由子類實現 (volatile) */ private volatile int state; // 等待隊列的結點 static final class Node { // 標識節點當前在共享模式下 static final Node SHARED = new Node(); // 標識節點當前在獨佔模式下 static final Node EXCLUSIVE = null; // 大於等於0 表明這個結點的執行緒取消了爭搶這個鎖,不需要去喚醒 /** * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; // 前一個結點 volatile Node prev; // 下一個結點 volatile Node next; // 等待的執行緒 volatile Thread thread; ... }
CLH隊列 — Craig, Landin, and Hagersten lock queue
CLH是一個非阻塞的 FIFO 隊列。也就是說往裡面插入或移除一個節點的時候,在並發條件下不會阻塞,而是通過自旋鎖和 CAS 保證節點插入和移除的原子性。
源碼導讀:ReentrantLock 公平鎖
public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(true); lock.lock(); lock.unlock(); }
我們先看一下lock, 然後是 unlock
獲得鎖的流程和源碼解讀
// ReentrantLock.FairSync final void lock() { acquire(1); } // AQS public final void acquire(int arg) { // 1) 當前執行緒嘗試獲得鎖,如果成功,結束 // 2)如果獲得鎖失敗,當前執行緒加入等待隊列,加入後不斷循環監視上一個結點狀態 // 如果上一個結點是頭結點head ,-嘗試獲得鎖,獲得成功,跳出循環 // 否則,根據上一個結點的waitStatus,進行調整,包括掛起當前執行緒,或調整上一個結點為非取消狀態的結點 // 3)最後當前執行緒發起中斷 Thread.currentThread().interrupt() if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // ReentrantLock.FairSync // 當前執行緒嘗試獲得鎖 // 返回true:1.沒有執行緒在等待鎖;2.重入鎖,執行緒本來就持有鎖,也就可以理所當然可以直接獲取 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1)查看是否有等待已經在head後面等待鎖了,如果有當前執行緒放棄搶佔鎖 // 2)如果沒有執行緒等待,CAS 嘗試修改 state, 如果成功,獲得鎖 // 3)設置的當前執行緒為 當前持有獨佔鎖的執行緒 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 4)如果state不為0, 說明為可重入,只需要判斷 當前執行緒 是否為 當前持有獨佔鎖的執行緒 // 5)如果是,可重入,state增加 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //6) 其他情況,獲得鎖失敗 return false; } // AQS // 構造結點,採用CAS 加入等待隊列的尾部 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //1) 如果等待隊列不為空,則CAS 將當前執行緒加入等待隊列的尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 2)如果等待隊列為空,則CAS 新建初始化CLH隊列,併當前執行緒加入等待隊列的尾部 enq(node); return node; } // AQS // 如果獲得鎖失敗,當前執行緒加入等待隊列 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 注意,這是一個循環,只有獲得鎖,或拋異常才會退出 // 如果上一個節點是頭結點 head,則嘗試獲得鎖 // 否則,如果當前執行緒需要掛起,則掛起等待鎖的釋放 for (;;) { // 1)查看當前結點的上一個結點,如果是頭結點head,嘗試獲得鎖 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 2)如果當前結點的上一個結點不是頭結點head,或獲得鎖失敗,則判斷一下是否需求掛起當前執行緒? // 3)如果需要掛起,則 park 掛起當前執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 4)如果獲得鎖失敗,並且拋異常,則當前執行緒取消搶佔 if (failed) cancelAcquire(node); } } // AQS private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 1)如果上一個結點的狀態是 -1, 則當前結點 需要掛起 if (ws == Node.SIGNAL) return true; // 2)如果上一個結點的狀態 大於0,表示取消搶佔了, 那麼循環往前找到一個結點的狀態 <=0,然後將當前結點的上一個結點 設置為 找到的結點 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } // 3)如果上一個結點<=0且不等於-1, 那麼結點狀態為 0(加入等待隊列的初始狀態), -2,-3, // 此時需要 用CAS將上一個節點的waitStatus設置為Node.SIGNAL(也就是-1) else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // AQS // 掛起當前執行緒,並測試返回當前執行緒是否中斷狀態 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
釋放鎖的流程和源碼解讀
// ReentrantLock final void unlock() { sync.release(1); } // AQS public final boolean release(int arg) { // 1)釋放鎖,嘗試修改state的值 if (tryRelease(arg)) { Node h = head; // 2)如果等待隊列有執行緒,則將頭部的結點狀態修正,並喚醒頭結點的下一個不是取消狀態的執行緒 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // ReentrantLock // 判斷當前執行緒 是否為 當前持有獨佔鎖的執行緒 // 如果是,修改state的值,包括可重入鎖減一 // 如果不是,拋出異常 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // AQS private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 如果head節點當前waitStatus<0, CAS 將其修改為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒後繼節點,但是有可能後繼節點取消了等待(waitStatus==1) // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒執行緒 LockSupport.unpark(s.thread); }
by 斯武丶風晴 https://my.oschina.net/langxSpirit