AQS解讀及其實踐
- 2019 年 10 月 7 日
- 筆記
AQS概述
AQS全稱AbstractQueuedSynchronizer,即抽象隊列同步器。AQS是用來構建鎖或者其他同步組件的基礎框架,它使用一個整型的volatile變數state來維護同步狀態,通過內置的FIFO隊列來完成資源獲取執行緒的排隊工作。AQS為一系列同步器依賴於一個單獨的原子變數state的同步器提供了一個非常有用的基礎。
AQS的設計是基於模板方法模式設計的,子類通過繼承AQS並實現它的抽象模板方法來管理同步狀態,而這些模板方法內部就是真正管理同步狀態的地方(主要有tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等)。
AQS既可以支援獨佔鎖地,也支援共享鎖,這樣就可以方便實現不同類型的同步組件如ReentrantLock、ReentrantReadWriteLock和CountDownLatch等。
AQS類使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSet操作來讀取和更新這個同步狀態。其中屬性state被聲明為volatile,並且通過使用CAS指令來實現compareAndSetState,使得當且僅當同步狀態擁有一個一致的期望值的時候,才會被原子地設置成新值,這樣就達到了同步狀態的原子性管理,確保了同步狀態的原子性、可見性和有序性。
補充:ReentrantReadWriteLock利用一個32位的int值保存了兩個count,前16位存readCount,後16位存writeCount。

AQS核心源碼解讀
AQS源碼中的主要欄位
// 同步隊列的head節點, 延遲初始化,除了初始化,只能通過setHead方法修改 // 如果head存在,waitStatus一定是CANCELLED private transient volatile Node head; // 同步隊列的tail節點,延遲初始化,只能通過enq方法修改 private transient volatile Node tail; // 同步狀態 private volatile int state; // 支援CAS private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset;
AQS源碼中的主要方法
protected final int getState() { return state; }protected final void setState(int newState) { state = newState; }protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }// 鉤子方法,獨佔式獲取同步狀態, 需要子類實現,實現此方法需要查詢當前同步狀態並 // 判斷同步狀態是否符合預期,然後再CAS設置同步狀態 // 返回值true代表獲取成功,false代表獲取失敗 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }// 鉤子方法,獨佔式釋放同步狀態,需要子類實現, // 等待獲取同步狀態的執行緒將有機會獲取同步狀態 // 返回值true代表獲取成功,false代表獲取失敗 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }// 鉤子方法,共享式獲取同步狀態,需要子類實現, // 返回值負數代表獲取失敗、0代表獲取成功但沒有剩餘資源、 // 正數代表獲取成功,還有剩餘資源 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }// 鉤子方法,共享式釋放同步狀態,需要子類實現 // 返回值負數代表獲取失敗、0代表獲取成功但沒有剩餘資源、 // 正數代表獲取成功,還有剩餘資源 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }// 模板方法,獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回, // 否則會進入同步隊列等待,此方法會調用子類重寫的tryAcquire方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }// 模板方法,獨佔式的釋放同步狀態,該方法會在釋放同步狀態後, // 將同步隊列中的第一個節點包含的執行緒喚醒 // 此方法會調用子類重寫的tryRelease方法 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }// 模板方法,共享式的獲取同步狀態,如果當前系統未獲取到同步狀態, // 將會進入同步隊列等待,同一時刻可以有多個執行緒獲取到同步狀態 // 此方法會調用子類重寫的tryAcquireShared方法 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }// 模板方法,共享式的釋放同步狀態 // 此方法會調用子類重寫的tryReleaseShared方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }// 用於將當前執行緒加入到等待隊列的隊尾,並返回當前執行緒所在的結點 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 嘗試將Node放到隊尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }//初始化或自旋CAS直到入隊成功 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
AQS實現CountDownLatch
CountDownLatch是一個同步工具類,用來協調多個執行緒之間的同步,CountDownLatch能夠使一個執行緒在等待另外一些執行緒完成各自工作之後,再繼續執行。使用一個計數器進行實現。計數器初始值為執行緒的數量。當每一個執行緒完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的執行緒都已經完成一些任務,然後在CountDownLatch上等待的執行緒就可以恢復執行接下來的任務。主要常用的方法countDown()方法以及await())方法。
基於AQS實現CountDownLatch
public class MyCountDownLatch {private Sync sync;public MyCountDownLatch(int count) { sync = new Sync(count); }public void countDown() { sync.releaseShared(1); }public void await() { sync.acquireShared(1); }class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { setState(count); }@Override protected int tryAcquireShared(int arg) { // 只有當state變為0時,加鎖成功 return getState() == 0 ? 1 : -1; }@Override protected boolean tryReleaseShared(int arg) { for (; ; ) { int c = getState(); if (c == 0) return false; int nextc = c - 1; // 用CAS操作,講count減一 if (compareAndSetState(c, nextc)) { // 當state=0時,釋放鎖成功,返回true return nextc == 0; } } } } }// 測試 public class MyCountDownLatchTest { /* 每隔1s開啟一個執行緒,共開啟6個執行緒 若希望6個執行緒 同時 執行某一操作 可以用CountDownLatch實現 */ public static void test01() throws InterruptedException { MyCountDownLatch ctl = new MyCountDownLatch(6);for (int i = 0; i < 6; i++) { new Thread() { @Override public void run() { ctl.countDown(); ctl.await(); System.out.println("here I am..."); } }.start(); Thread.sleep(1000L); } }/* 開啟6個執行緒,main執行緒希望6個執行緒都執行完某個操作後,才執行某個操作 可以用CountDownLatch來實現 */ public static void test02() throws InterruptedException { MyCountDownLatch ctl = new MyCountDownLatch(6);for (int i = 0; i < 6; i++) { new Thread() { @Override public void run() { System.out.println("after print..."); ctl.countDown(); } }.start(); Thread.sleep(1000L); }ctl.await(); System.out.println("main thread do something ..."); }public static void main(String args[]) throws InterruptedException { test01(); } }
AQS實現Semaphore
Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內部維護了一個計數器,其值為可以訪問的共享資源的個數。一個執行緒要訪問共享資源,先獲得訊號量,如果訊號量的計數器值大於1,意味著有共享資源可以訪問,則使其計數器值減去1,再訪問共享資源。Semaphore用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。還可以用來實現某種資源池限制,或者對容器施加邊界。常用方法為acquire()方法和release()方法。
基於AQS實現CountDownLatch
public class MySemaphore {private Sync sync;public MySemaphore(int permits) { sync = new Sync(permits); }//搶訊號量、就是在加鎖 public void acquire() { sync.acquireShared(1); }//釋放訊號量,就是解鎖 public void release() { sync.releaseShared(1); }class Sync extends AbstractQueuedSynchronizer { private int permits;public Sync(int permits) { this.permits = permits; }@Override protected int tryAcquireShared(int arg) { int state = getState(); int nextState = state + arg; // 如果訊號量沒佔滿,加鎖的個數沒有達到permits if (nextState <= permits) { if (compareAndSetState(state, nextState)) { return 1; } } return -1; }@Override protected boolean tryReleaseShared(int arg) { int state = getState(); if (compareAndSetState(state, state - arg)) { return true; } else { return false; } } } }// 測試 public class MySemaphoreTest { static MySemaphore sp = new MySemaphore(6);public static void main(String args[]) { for (int i = 0; i < 1000; i++) { new Thread() { @Override public void run() { try { sp.acquire(); // 搶訊號量、就是在加鎖 Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } queryDB("localhost:3006"); sp.release(); // 釋放訊號量,就是解鎖 } }.start(); } }public static void queryDB(String url) { System.out.println("query " + url); } }