AQS詳解

作用

提供一個框架用於實現依賴先進先出等待隊列的阻塞鎖和相關同步器(信號量,事件)

使用

子類應該定義為非公共內部幫助類,用於實現其封閉類的同步屬性,AQS並不實現任何同步接口,這一部分主要是從源碼里搬過來的

class Mutex implements Lock, java.io.Serializable {
   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Reports whether in locked state
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provides a Condition
     Condition newCondition() { return new ConditionObject(); }

     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }
 class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

實現

主要分為兩個大的部分

一為對於state的訪問與維護,聚焦於鎖本身

二為對於需要獲取鎖的線程的訪問與維護,聚焦於想要獲取鎖的線程

三為獲取釋放鎖的過程,聚焦於線程與鎖的交互

state維護

AQS是這樣定義state的volatile int state

初始化

訪問

讀取和寫入分別有一個常規的set,get方法getState(),setState(int newState)

但這會有一個問題,我們經常需要先判斷state之前的狀態,然後再對其進行修改,如果採用if+set的形式,在並發情況下很可能產生問題.

AQS採用CAS操作提供原子性,從而避免了這個問題,提供的方法為compareAndSetState(int expect, int update)

這個方法事實上也是調用的Unsafe類提供的一個native方法

同步隊列

在並發的語境下,幾乎都要考慮多個線程去競爭一把鎖的情形,而往往鎖又是互斥的或者是獨佔的.如果一個線程獲取到了鎖,那其他線程顯然不能直接放棄,而AQS則通過內建的同步隊列去存儲這些線程.

Node

AQS使用靜態內部類Node去作為這個同步隊列的元素,一個Node里,不僅僅包含了一個Thread對象,還存儲着一些其他信息,例如線程此時的等待狀態,前後節點的指針

Queue

其實通過Node的數據結構就可以看出來,隊列是以一種鏈表的形式存在的(AQS並沒有使用現成的集合框架),通過兩個Node類型的變量tail,head去定位到自己想要操縱的數據.

說完了Queue的一些重要屬性,我們再來看看他的一些方法. 大部分都是類似於get,set方法區做一些簡單的訪問.

private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

以上這三個方法都是原子操作,但顯然我們新加入一個節點的話至少需要設置tail,next兩個指針的指向,這卻不是原子性的了

AQS採用addWaiter方法包裝這一系列操作

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

通過CompareAndSetTail(實際上是Unsafe類的compareAndSetObject()去設置)來設置tail節點,通過後再將尾節點與前一個節點進行連接,設置不成功則進入死循環,不斷獲取當前的tail節點,然後CAS去設置

節點進入同步隊列之後,就進入了一個自旋的過程,每個節點(或者說每個線程)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中(並會阻塞節點的線程)

鎖的獲取與釋放

這一部分,AQS採取了模板方法的設計模式,將個性化的操作留於具體的鎖或者其他同步組件.從這個模塊的名字就很容易想到,最為核心的就只有兩個方法acquirerelease(當然,這裡暫且先不涉及share與exclusive的劃分)

先從acquire來分析

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這個方法主要是調用了其他的四個方法,其中tryAcquire(arg)是交由子類實現的,addWaiter上面已經介紹過,selfInterrupt()只是對於Thread類的interrupt()方法的簡單包裝.也就是說,現在值得我們注意的僅僅只有acquireQueued()這一個方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取前一個節點
                final Node p = node.predecessor();
                //如果前一個節點是頭結點且已經獲取到了鎖則返回此時的中斷狀態
                if (p == head && tryAcquire(arg)) {
                    //把當前節點設置為頭結點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(p, node)判斷線程是否應該阻塞,

parkAndCheckInterrupt()里也就兩個方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

我們知道,interrupt()相關的方法只是置一個標記,而不會真正去使線程被中斷.也就是說最核心的使當前線程暫停運行的方法就是

LockSupport.park(this)這個方法了

再說release,這個方法要簡單一些

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease依然交由子類去實現,核心一眼便知unparkSucccessor

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //與之前的park()對應
            LockSupport.unpark(s.thread);
    }

由以上代碼可知,事實上,我們喚醒的並不是頭結點而是頭結點的下一個節點 ,這一點從enq這個方法里就能找到源頭 (頭結點是啞結點,裏面是沒有數據的).

到現在為止,我們可以發現,真正使得一個線程得以暫停的就只是LockSupport的park.我們完全可以僅僅只用這個park去實現一個最淳樸,最簡陋的鎖,示例代碼如下


public class SimpleLock {
   //利用原子變量自帶的CAS
   AtomicInteger state = new AtomicInteger(0);
   //代表競爭的另一個線程
   Thread next;

   public void lock(){
       if(!state.compareAndSet(0,1)){
           next = Thread.currentThread();
           LockSupport.park();
       }
   }
   public void unlock(){
           if(next!=null&&state.compareAndSet(1,0)){
               LockSupport.unpark(next);
           }
   }
}