AQS解讀及其實踐

  • 2019 年 10 月 7 日
  • 筆記

AQS概述

AQS全稱AbstractQueuedSynchronizer,即抽象隊列同步器。AQS是用來構建鎖或者其他同步組件的基礎框架,它使用一個整型的volatile變數state來維護同步狀態,通過內置的FIFO隊列來完成資源獲取執行緒的排隊工作。AQS為一系列同步器依賴於一個單獨的原子變數state的同步器提供了一個非常有用的基礎。

AQS的設計是基於模板方法模式設計的,子類通過繼承AQS並實現它的抽象模板方法來管理同步狀態,而這些模板方法內部就是真正管理同步狀態的地方(主要有tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等)。

AQS既可以支援獨佔鎖地,也支援共享鎖,這樣就可以方便實現不同類型的同步組件如ReentrantLock、ReentrantReadWriteLock和CountDownLatch等。

AQS類使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSet操作來讀取和更新這個同步狀態。其中屬性state被聲明為volatile,並且通過使用CAS指令來實現compareAndSetState,使得當且僅當同步狀態擁有一個一致的期望值的時候,才會被原子地設置成新值,這樣就達到了同步狀態的原子性管理,確保了同步狀態的原子性、可見性和有序性。

補充:ReentrantReadWriteLock利用一個32位的int值保存了兩個count,前16位存readCount,後16位存writeCount。

AQS核心源碼解讀

AQS源碼中的主要欄位

// 同步隊列的head節點, 延遲初始化,除了初始化,只能通過setHead方法修改  // 如果head存在,waitStatus一定是CANCELLED  private transient volatile Node head;  // 同步隊列的tail節點,延遲初始化,只能通過enq方法修改  private transient volatile Node tail;  // 同步狀態  private volatile int state;  // 支援CAS  private static final Unsafe unsafe = Unsafe.getUnsafe();  private static final long stateOffset;  private static final long headOffset;  private static final long tailOffset;  private static final long waitStatusOffset;  private static final long nextOffset;

AQS源碼中的主要方法

    protected final int getState() {  return state;  }protected final void setState(int newState) {  state = newState;  }protected final boolean compareAndSetState(int expect, int update) {  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  }// 鉤子方法,獨佔式獲取同步狀態, 需要子類實現,實現此方法需要查詢當前同步狀態並  // 判斷同步狀態是否符合預期,然後再CAS設置同步狀態  // 返回值true代表獲取成功,false代表獲取失敗  protected boolean tryAcquire(int arg) {  throw new UnsupportedOperationException();  }// 鉤子方法,獨佔式釋放同步狀態,需要子類實現,  // 等待獲取同步狀態的執行緒將有機會獲取同步狀態  // 返回值true代表獲取成功,false代表獲取失敗  protected boolean tryRelease(int arg) {  throw new UnsupportedOperationException();  }// 鉤子方法,共享式獲取同步狀態,需要子類實現,  // 返回值負數代表獲取失敗、0代表獲取成功但沒有剩餘資源、  // 正數代表獲取成功,還有剩餘資源  protected int tryAcquireShared(int arg) {  throw new UnsupportedOperationException();  }// 鉤子方法,共享式釋放同步狀態,需要子類實現  // 返回值負數代表獲取失敗、0代表獲取成功但沒有剩餘資源、  // 正數代表獲取成功,還有剩餘資源  protected boolean tryReleaseShared(int arg) {  throw new UnsupportedOperationException();  }// 模板方法,獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,  // 否則會進入同步隊列等待,此方法會調用子類重寫的tryAcquire方法  public final void acquire(int arg) {  if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  selfInterrupt();  }// 模板方法,獨佔式的釋放同步狀態,該方法會在釋放同步狀態後,  // 將同步隊列中的第一個節點包含的執行緒喚醒  // 此方法會調用子類重寫的tryRelease方法  public final boolean release(int arg) {  if (tryRelease(arg)) {  Node h = head;  if (h != null && h.waitStatus != 0)  unparkSuccessor(h);  return true;  }  return false;  }// 模板方法,共享式的獲取同步狀態,如果當前系統未獲取到同步狀態,  // 將會進入同步隊列等待,同一時刻可以有多個執行緒獲取到同步狀態  // 此方法會調用子類重寫的tryAcquireShared方法  public final void acquireShared(int arg) {  if (tryAcquireShared(arg) < 0)  doAcquireShared(arg);  }// 模板方法,共享式的釋放同步狀態  // 此方法會調用子類重寫的tryReleaseShared方法  public final boolean releaseShared(int arg) {  if (tryReleaseShared(arg)) {  doReleaseShared();  return true;  }  return false;  }// 用於將當前執行緒加入到等待隊列的隊尾,並返回當前執行緒所在的結點  private Node addWaiter(Node mode) {  Node node = new Node(Thread.currentThread(), mode);  // Try the fast path of enq; backup to full enq on failure  Node pred = tail;    // 嘗試將Node放到隊尾  if (pred != null) {  node.prev = pred;  if (compareAndSetTail(pred, node)) {  pred.next = node;  return node;  }  }  enq(node);  return node;  }//初始化或自旋CAS直到入隊成功  private Node enq(final Node node) {  for (;;) {  Node t = tail;  if (t == null) { // Must initialize  if (compareAndSetHead(new Node()))  tail = head;  } else {  node.prev = t;  if (compareAndSetTail(t, node)) {  t.next = node;  return t;  }  }  }  }

AQS實現CountDownLatch

CountDownLatch是一個同步工具類,用來協調多個執行緒之間的同步,CountDownLatch能夠使一個執行緒在等待另外一些執行緒完成各自工作之後,再繼續執行。使用一個計數器進行實現。計數器初始值為執行緒的數量。當每一個執行緒完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的執行緒都已經完成一些任務,然後在CountDownLatch上等待的執行緒就可以恢復執行接下來的任務。主要常用的方法countDown()方法以及await())方法。

基於AQS實現CountDownLatch

public class MyCountDownLatch {private Sync sync;public MyCountDownLatch(int count) {  sync = new Sync(count);  }public void countDown() {  sync.releaseShared(1);  }public void await() {  sync.acquireShared(1);  }class Sync extends AbstractQueuedSynchronizer {  public Sync(int count) {  setState(count);  }@Override  protected int tryAcquireShared(int arg) {  // 只有當state變為0時,加鎖成功  return getState() == 0 ? 1 : -1;  }@Override  protected boolean tryReleaseShared(int arg) {  for (; ; ) {  int c = getState();  if (c == 0) return false;  int nextc = c - 1;  // 用CAS操作,講count減一  if (compareAndSetState(c, nextc)) {  // 當state=0時,釋放鎖成功,返回true  return nextc == 0;  }  }  }  }  }// 測試  public class MyCountDownLatchTest {  /*  每隔1s開啟一個執行緒,共開啟6個執行緒  若希望6個執行緒 同時 執行某一操作  可以用CountDownLatch實現  */  public static void test01() throws InterruptedException {  MyCountDownLatch ctl = new MyCountDownLatch(6);for (int i = 0; i < 6; i++) {  new Thread() {  @Override  public void run() {  ctl.countDown();  ctl.await();  System.out.println("here I am...");  }  }.start();  Thread.sleep(1000L);  }  }/*  開啟6個執行緒,main執行緒希望6個執行緒都執行完某個操作後,才執行某個操作  可以用CountDownLatch來實現  */  public static void test02() throws InterruptedException {  MyCountDownLatch ctl = new MyCountDownLatch(6);for (int i = 0; i < 6; i++) {  new Thread() {  @Override  public void run() {  System.out.println("after print...");  ctl.countDown();  }  }.start();  Thread.sleep(1000L);  }ctl.await();  System.out.println("main thread do something ...");  }public static void main(String args[]) throws InterruptedException {  test01();  }  }

AQS實現Semaphore

Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內部維護了一個計數器,其值為可以訪問的共享資源的個數。一個執行緒要訪問共享資源,先獲得訊號量,如果訊號量的計數器值大於1,意味著有共享資源可以訪問,則使其計數器值減去1,再訪問共享資源。Semaphore用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。還可以用來實現某種資源池限制,或者對容器施加邊界。常用方法為acquire()方法和release()方法。

基於AQS實現CountDownLatch

public class MySemaphore {private Sync sync;public MySemaphore(int permits) {  sync = new Sync(permits);  }//搶訊號量、就是在加鎖  public void acquire() {  sync.acquireShared(1);  }//釋放訊號量,就是解鎖  public void release() {  sync.releaseShared(1);  }class Sync extends AbstractQueuedSynchronizer {  private int permits;public Sync(int permits) {  this.permits = permits;  }@Override  protected int tryAcquireShared(int arg) {  int state = getState();  int nextState = state + arg;  // 如果訊號量沒佔滿,加鎖的個數沒有達到permits  if (nextState <= permits) {  if (compareAndSetState(state, nextState)) {  return 1;  }  }  return -1;  }@Override  protected boolean tryReleaseShared(int arg) {  int state = getState();  if (compareAndSetState(state, state - arg)) {  return true;  } else {  return false;  }  }  }  }// 測試  public class MySemaphoreTest {  static MySemaphore sp = new MySemaphore(6);public static void main(String args[]) {  for (int i = 0; i < 1000; i++) {  new Thread() {  @Override  public void run() {  try {  sp.acquire(); // 搶訊號量、就是在加鎖  Thread.sleep(2000L);  } catch (InterruptedException e) {  e.printStackTrace();  }  queryDB("localhost:3006");  sp.release(); // 釋放訊號量,就是解鎖  }  }.start();  }  }public static void queryDB(String url) {  System.out.println("query " + url);  }  }