Java並發——線程間的等待與通知

  • 2019 年 10 月 17 日
  • 筆記

前言:

  前面講完了一些並發編程的原理,現在我們要來學習的是線程之間的協作。通俗來說就是,當前線程在某個條件下需要等待,不需要使用太多系統資源。在某個條件下我們需要去喚醒它,分配給它一定的系統資源,讓它繼續工作。這樣能更好的節約資源。

一、Object的wait()與notify()

  基本概念:

    一個線程因執行目標動作的條件未能滿足而被要求暫停就是wait,而一個線程滿足執行目標動作的條件之後喚醒被暫停的線程就是notify。

  基本模板:

synchronized (obj){              //保護條件不成立              while(flag){                  //暫停當前線程                  obj.wait();              }              //當保護條件成立,即跳出while循環執行目標動作              doAction();          }

  解析wait():Object.wait()的作用是使執行線程被暫停,該執行線程生命周期就變更為WAITING,這裡注意一下,是無限等待,直到有notify()方法通知該線程喚醒。Object.wait(long timeout)的作用是使執行線程超過一定時間沒有被喚醒就自動喚醒,也就是超時等待。Object.wait(long timeout,int naous)是更加精準的控制時間的方法,可以控制到毫微秒。這裡需要注意的是wait()會在當前線程擁有鎖的時候才能執行該方法並且釋放當前線程擁有的鎖,從而讓該線程進入等待狀態,其他線程來嘗試獲取當前鎖。也就是需要申請鎖與釋放鎖。

  解析notify():Object.notify()方法是喚醒調用了wait()的線程,只喚醒最多一個。如果有多個線程,不一定能喚醒我們所想要的線程。Object.notifyAll()喚醒所有等待的線程。notify方法一定是通知線程先獲取到了鎖才能進行通知。通知之後當前的通知線程需要釋放鎖,然後由等待線程來獲取。所以涉及到了一個申請鎖與釋放鎖的步驟。

  wait()與notify()之間存在的三大問題:

  從上面的解析可以看出,notify()是無指向性的喚醒,notifyAll()是無偏差喚醒。所以會產生下面三個問題

  過早喚醒:假設當前有三組等待(w1,w2,w3)與通知(n1,n2,n3)線程同步在對象obj上,w1,w2的判斷喚醒條件相同,由線程n1更新條件並喚醒,w3的判斷喚醒條件不同,由n2,n3更新條件並喚醒,這時如果n1執行了喚醒,那麼不能執行notify,因為需要叫醒兩條線程,只能用notifyAll(),可是用了之後w3的條件未能滿足就被叫醒,就需要一直佔用資源的去等待執行。

  信號丟失:這個問題主要是程序員編程出現了問題,並不是內部實現機制出現的問題。編程時如果在該使用notifyAll()的地方使用notify()那麼只能喚醒一個線程,從而使其他應該喚醒的線程未能喚醒,這就是信號丟失。如果等待線程在執行wait()方法前沒有先判斷保護條件是否成立,就會出現通知線程在該等待線程進入臨界區之前就已經更新了相關共享變量,並且執行了notify()方法,但是由於wait()還未能執行,且沒有設置共享變量的判斷,所以會執行wait()方法,導致線程一直處於等待狀態,丟失了一個信號。

  欺騙性喚醒:等待線程並不是一定有notify()/notifyAll()才能被喚醒,雖然出現的概率特別低,但是操作系統是允許這種情況發生的。

  上下文切換問題:首先wait()至少會導致線程對相應對象內部鎖的申請與釋放。notify()/notifyAll()時需要持有相應的對象內部鎖並且也會釋放該鎖,會出現上下文切換問題其實就是從RUNNABLE狀態變為非RUNNABLE狀態會出現

  針對問題的解決方案:

  信號丟失與欺騙性喚醒問題:都可以使用while循環來避免,也就是上面的模板中寫的那樣。

  上下文切換問題:在保證程序正確性的情況下使用notify()代替notifyAll(),notify不會導致過早喚醒,所以減少了上下文的切換。並且使用了notify之後應該儘快釋放相應內部鎖,從而讓wait()能夠更快的申請到鎖。

  過早喚醒:使用java.util.concurrent.locks.Condition中的await與signal。

  PS:由於Object中的wait與notify使用的是native方法,即C++編寫,這裡不做源碼解析。

二、Condition中的await()與signal()

  這個方法相應的改變了上面所說的無指向性的問題,每個Condition內部都會維護一個隊列,從而讓我們對線程之間的操作更加靈活。下面通過分析源碼讓我們了解一下內部機制。Condition是個接口,真正的實現是AbstractQueuedSynchronizer中的內部類ConditionObject。

  基本屬性:

public class ConditionObject implements Condition, java.io.Serializable {          private static final long serialVersionUID = 1173984872572414699L;          /** First node of condition queue. */          private transient Node firstWaiter;          /** Last node of condition queue. */          private transient Node lastWaiter;  }

  從基本屬性中可看出維護的是雙端隊列。

  await()方法解析:

public class ConditionObject implements Condition, java.io.Serializable {    public final void await() throws InterruptedException {     // 1. 判斷線程是否中斷      if(Thread.interrupted()){          throw new InterruptedException();      }     // 2. 將線程封裝成一個 Node 放到 Condition Queue 裏面      Node node = addConditionWaiter();     // 3. 釋放當前線程所獲取的所有的鎖 (PS: 調用 await 方法時, 當前線程是必須已經獲取了獨佔的鎖)                    int savedState = fullyRelease(node);      int interruptMode = 0;     // 4. 判斷當前線程是否在 Sync Queue 裏面(這裡 Node 從 Condtion Queue 裏面轉移到 Sync Queue 裏面有兩種可能 
   //(1) 其他線程調用 signal 進行轉移 (2) 當前線程被中斷而進行Node的轉移(就在checkInterruptWhileWaiting裏面進行轉移))
while(!isOnSyncQueue(node)){      // 5. 當前線程沒在 Sync Queue 裏面, 則進行 block LockSupport.park(this);      // 6. 判斷此次線程的喚醒是否因為線程被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進行節點的轉移;
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){      // 說明此是通過線程中斷的方式進行喚醒, 並且已經進行了 node 的轉移, 轉移到 Sync Queue 裏面 break; } }    // 7. 調用 acquireQueued在 Sync Queue 裏面進行獨佔鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過 if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ interruptMode = REINTERRUPT; }    // 8. 通過 "node.nextWaiter != null" 判斷 線程的喚醒是中斷還是 signal。
   //因為通過中斷喚醒的話, 此刻代表線程的 Node 在 Condition Queue 與 Sync Queue 裏面都會存在
if(node.nextWaiter != null){     // 9. 進行 cancelled 節點的清除 unlinkCancelledWaiters(); }    // 10. "interruptMode != 0" 代表通過中斷的方式喚醒線程 if(interruptMode != 0){      // 11. 根據 interruptMode 的類型決定是拋出異常, 還是自己再中斷一下 reportInterruptAfterWait(interruptMode); }   } }

  上面源代碼可看出Condition內部維護的隊列是一個等待隊列,當需要調用signal()方法時就會讓當前線程節點從Condition queue轉到Sync queue隊列中去競爭鎖從而喚醒。

  signal()源碼解析:

public class ConditionObject implements Condition, java.io.Serializable {      public final void signal() {              if (!isHeldExclusively())                  throw new IllegalMonitorStateException();              Node first = firstWaiter;              if (first != null)                  doSignal(first);          }      private void doSignal(Node first) {              do {                  //傳入的鏈表下一個節點為空,則尾節點置空                  if ( (firstWaiter = first.nextWaiter) == null)                      lastWaiter = null;                  //當前節點的下一個節點為空                  first.nextWaiter = null;                  //如果成功將node從condition queue轉換到sync queue,則退出循環,節點為空了也退出循環。否則就接着在隊列中找尋節點進行喚醒              } while (!transferForSignal(first) &&                       (first = firstWaiter) != null);          }  } 

  signal()會使等待隊列中的一個任意線程被喚醒,signalAll()則是喚醒該隊列中的所有線程。這樣通過不同隊列維護不同線程,就可以達到指向性的功能。可以消除由過早喚醒帶來的資源損耗。注意的是在使用signal()方法前需要獲取鎖,即lock(),而後需要儘快unlock(),這樣可以避免上下文切換的損耗。

總結:

  面向對象的世界中,一個類往往需要藉助其他的類來一起完成計算,同樣線程的世界也是,多個線程可以同時完成一個任務,通過喚醒與等待,能更好的操作線程,從而讓線程在需要使用資源的時候分配資源給它,而不使用資源的時候就可以將資源讓給其他線程操作。關於Condition中提到的Sync queue可參考Java並發——結合CountDownLatch源碼、Semaphore源碼及ReentrantLock源碼來看AQS原理來看內部維護的隊列是如何獲取鎖的。