學習JUC源碼(3)——Condition等待隊列(源碼分析結合圖文理解)

前言

  在Java多執行緒中的wait/notify通訊模式結尾就已經介紹過,Java執行緒之間有兩種種等待/通知模式,在那篇博文中是利用Object監視器的方法(wait(),notify()、notifyAll())實現的,然而在實際生產環境中不推薦使用此方法,建議使用condition的等待通知模式,JUC包中很多核心實現也確實證實了這點,所以這必然是學習JUC包源碼的基礎。

  如果之前閱讀過前不久介紹同步隊列的博文學習JUC源碼(1)——AQS同步隊列(源碼分析結合圖文理解),就能更好理解Condition等待隊列了,都是基於AQS.Node實現的隊列,兩者是同步器實現的核心所在!

  主要參考資料《Java並發編程藝術》(有需要的小夥伴可以找我,我這裡只有電子PDF)同時結合ReentranLock、AQS、ArrayBlockingQueue等源碼。

 


 

一、Condition等待隊列介紹

1、對比Object監視器方法與Condition方法

以下對比圖來源於《Java並發編程藝術》,可以清楚看到Condition比Object監視器更加靈活,支援中斷響應等

2、Condition方法使用介紹

我們先看下阻塞隊列ArrayBlockingQueue中關於condition的經典應用,這裡使用就是condition的等待通知模式實現有界阻塞隊列,即簡單總結:當隊列滿時,阻塞插入執行緒,隊列空時,獲取元素的執行緒等待

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 隊列滿時等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 隊列空時進入Condition等待隊列等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

從中我們看出兩點:

  • Condition要結合Lock對象實現,兩者是同時存在的。準確來說先有Lock對象,然後再創建Condition對象
  • 執行緒調用這些方法時候,需要提前獲取到Condition對象關聯的鎖,Condition對象是由Lock對象(Lock.newConditoin())創建出來的,也就是說Condition是依賴Lock對象的。

同樣地,我們可以從中抽取出等待-通知模式,然後編寫Demo如下,其中標紅的可以理解為通用的等待/通知模式

public class ConditionDemo {
    ReentrantLock lock = new ReentrantLock();
    // 一般Condition都是作為成員變數
    Condition condition = lock.newCondition();

    public  void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            // 當前執行緒等待
            // 調用signal方法後返回
            System.out.println(Thread.currentThread()+" is waiting..., now: "+new Date());
            Thread.sleep(2000);
            condition.await(); // 注意這裡是await,而不是wait
            System.out.println(Thread.currentThread()+" return..., now: "+new Date());
        } finally {
            lock.unlock();
        }
    }
    public void signalCondition() throws InterruptedException {
        lock.lock();
        try {
            // 喚醒獲得condition上的等待鎖
            System.out.println(Thread.currentThread()+" is signaling..., now: "+new Date());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionDemo condition = new ConditionDemo();
            new Thread(()->{
                try {
                    condition.waitCondition();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            new Thread(()->{
                try {
                    condition.signalCondition();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    }

}

運行Demo,可以看到等待執行緒確實等待了2s之後從await()方法返回。

Thread[Thread-0,5,main] is waiting..., now: Wed Dec 23 21:35:53 CST 2020
Thread[Thread-1,5,main] is signaling..., now: Wed Dec 23 21:35:55 CST 2020
Thread[Thread-0,5,main] return..., now: Wed Dec 23 21:35:55 CST 2020

接下來就是深入源碼理解Condition等待隊列是如何實現的

二、Condition等待隊列的實現分析(源碼分析)

1、Condition等待隊列介紹

(1)概念認識

先引出簡單的認識,其實對比同步隊列來說,很好理解,實際上更加簡單

  • 等待隊列是一個單向FIFO隊列,隊列每個節點都包含了一個執行緒引用,該執行緒是在Condition對象上等待的執行緒;
  • 實際上這裡的等待隊列和AQS中的同步隊列,都是採用AQS.Node靜態內部類;
  • 一個ConditionObject擁有首節點(fisrtWaiter)和尾節點(lastWaiter);
  • 如果一個執行緒調用了Condition.await()方法,那麼該執行緒將會釋放鎖(從同步隊列中移除),構造成節點加入等待隊列,等待被喚醒;
  • 如果一個執行緒調用了Condition.signal()方法,那麼該執行緒將會被喚醒(從等待隊列中移除),構造成節點加入同步隊列,嘗試重新獲取同步狀態;

(2)等待隊列結構圖

實際上,Condition的實現是在AQS中內部類ConditionObject實現Condition具體實現的:

        

等待隊列的結構圖如下圖,相比較同步隊列而言:

  • 等待隊列來說更加簡單,是單向FIFO隊列;
  • Condition擁有首尾節點引用,新增節點直接nextWaiter指向即可,這個過程不需要CAS保證,因為調用Condition.await()方法肯定是獲取了鎖的執行緒,也就是說該過程是來保證執行緒安全的。

      

實際上,AQS同步器只擁有一個同步隊列,但卻有多個Condition等待隊列,如下圖。

                      

2、await()方法實現解析

當調用await()方法時,相當於同步隊列的首節點(獲取了鎖的節點)移動到了Condition的等待隊列中。

   

更具體來說是首先調用await()方法之前肯定是能獲取到同步狀態的執行緒,也就是同步隊列中首節點,之後調用await()方法由將釋放鎖,進入等待隊列

分析源碼(重點部分都已經注釋,結合圖應該更好理解):

1)調用await()方法,通過addConditionWaiter()方法加入等待執行緒,然後釋放全部同步狀態

2)進入while循環,判斷是否已經移動到同步隊列中,如果已經被移動到同步隊列中則說明執行緒已經被喚醒(signal);

3)接下來嘗試獲取競爭同步狀態,即調用acquireQueue方法

     public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // addConditionWaiter()方法當前執行緒加入等待隊列
            Node node = addConditionWaiter();
            // 調用await()方法後釋放鎖(同步狀態)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 檢查node是否在同步隊列中,不是的話說明已經獲取到鎖
            //  LockSupport.unpark喚醒執行緒後,從這裡返回,此時已經在SyncQueue同步隊列中,退出循環
            // 從這裡也可以看出,也是經典的等待/通知模式
            while (!isOnSyncQueue(node)) {
                // 阻塞當前執行緒
                LockSupport.park(this);
                // 在調用signal前拋出中斷異常,或者調用之後中斷,都退出循環
                // THROW_IE if interrupted  before signalled, REINTERRUPT if after signalled
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 被喚醒後的執行緒重新嘗試競爭獲取同步狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

結構流程如下圖:

3、signal()方法實現解析

調用signal方法將會喚醒等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移動到同步隊列中

     

具體來說當前執行緒獲取到了鎖,接著獲取等待隊列的首節點,將其移動到同步隊列中,並且喚醒節點中的執行緒。

分析源碼(重點部分都已經注釋,結合圖應該更好理解):

1)進入signal()方法,調用doSignal(Node node)方法移動到同步隊列中,並喚醒節點中執行緒

  public final void signal() {
            // 判斷當前執行緒是否持有獲得鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                // 移動到同步隊列中,並且喚醒節點中的執行緒
                doSignal(first);
        }
  private void doSignal(Node first) {
            do {
                // 如果等待隊列中只有一個節點(即首節點),則喚醒首節點後lastWaiter置空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 否則獲取等待隊列中的首節點,即next域斷開置空
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

2)doSignal(Node node)方法中調用transferForSignal(Node node),通過調用enq(Node node)方法(這裡其實就是同步隊列的入隊enq(Node node)方法),等待隊列中的頭結點執行緒安全地移動到同步隊列,當節點移動到同步隊列後,當前執行緒將會被喚醒(LockSupport.unpark(node.thread))。

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        // 如果沒有正確設置等待狀態為初始狀態準備加入同步隊列中,則返回,當前節點狀態為Node.CONDITION
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 將等待隊列中的頭結點移動到同步隊列中, 返回已經加入的當前node在同步隊列中前節點
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 喚醒當前node執行緒,返回while(isOnSynQueue(Node node))處,退出循環
            LockSupport.unpark(node.thread);
        return true;
    }

流程結構如下圖: