CyclicBarrier源碼探究 (JDK 1.8)

  • 2020 年 3 月 13 日
  • 筆記

CyclicBarrier也叫迴環柵欄,能夠實現讓一組執行緒運行到柵欄處並阻塞,等到所有執行緒都到達柵欄時再一起執行的功能。「迴環」意味著CyclicBarrier可以多次重複使用,相比於CountDownLatch只能使用一次,CyclicBarrier可以節省許多資源,並且還可以在構造器中傳入任務,當柵欄條件滿足時執行這個任務。CyclicBarrier是使用了ReentrantLock,主要方法在執行時都會加鎖,因此並發性能不是很高。

1.相關欄位

    //重入鎖,CyclicBarrier內部通過重入鎖實現執行緒安全      private final ReentrantLock lock = new ReentrantLock();      //執行緒阻塞時的等待條件      private final Condition trip = lock.newCondition();      //需要等待的執行緒數      private final int parties;      //柵欄打開之後首先執行的任務      private final Runnable barrierCommand;      //記錄當前的分代標記      private Generation generation = new Generation();      //當前還需要等待多少個執行緒運行到柵欄位置      private int count;

需要注意的是generation欄位,用於標記柵欄當前處在哪一代。當滿足一定的條件時(例如調用了reset方法,或者柵欄打開等),柵欄狀態會切換到下一代,實際就是new一個新的Generation對象,這是CyclicBarrier的內部類,程式碼非常簡單,如下:

    private static class Generation {          boolean broken = false;   //標記柵欄是否被破壞      }

實際使用的過程中,會利用generation欄位判斷當前是否在同一個分代,而使用broker欄位判斷柵欄是否被破壞。

2.構造函數

CyclicBarrier有兩個重載的構造函數,構造函數只是對上述的相關欄位進行初始化,如下:

    public CyclicBarrier(int parties) {          this(parties, null);      }        public CyclicBarrier(int parties, Runnable barrierAction) {          if (parties <= 0) throw new IllegalArgumentException();          this.parties = parties;          this.count = parties;          this.barrierCommand = barrierAction;      }

3.核心方法

  • await
    await是開發時最常用到的方法了,同CountDownLatch一樣,CyclicBarrier也提供了兩個await方法,一個不帶參數,一個帶有超時參數,其內部只是簡單調用了一下dowait方法:
    public int await() throws InterruptedException, BrokenBarrierException {          try {              return dowait(false, 0L);          } catch (TimeoutException toe) {              throw new Error(toe); // cannot happen          }      }        public int await(long timeout, TimeUnit unit)          throws InterruptedException,                 BrokenBarrierException,                 TimeoutException {          return dowait(true, unit.toNanos(timeout));      }

接下來看看至關重要的dowait方法:

    private int dowait(boolean timed, long nanos)          throws InterruptedException, BrokenBarrierException,                 TimeoutException {          final ReentrantLock lock = this.lock;          //加重入鎖          lock.lock();          try {              //首先獲取年齡代資訊              final Generation g = generation;              //如果柵欄狀態被破壞,拋出異常,例如先啟動的執行緒調用了breakBarrier方法,後啟動的執行緒就能夠看到g.broker=true              if (g.broken)                  throw new BrokenBarrierException();              //檢測執行緒的中斷狀態,如果執行緒設置了中斷狀態,則通過breakBarrier設置柵欄為已破壞狀態,並喚醒其他執行緒              //如果這裡能夠檢測到中斷狀態,那隻可能是在await方法外部設置的              if (Thread.interrupted()) {                  breakBarrier();                  throw new InterruptedException();              }              //每調用一次await,就將需要等待的執行緒數減1              int index = --count;              //index=0表示這是最後一個到達的執行緒,由該執行緒執行下面的邏輯              if (index == 0) {  // tripped                  boolean ranAction = false;                  try {                      final Runnable command = barrierCommand;                      //如果在構造器中傳入了第二個任務參數,就在放開柵欄前先執行這個任務                      if (command != null)                          command.run();                      ranAction = true;                      //正常結束,需要喚醒阻塞的執行緒,並換代                      nextGeneration();                      return 0;                  } finally {                      //try程式碼塊如果正常執行,ranAction就一定等於true,而try程式碼塊唯一可能發生異常的地方就是command.run(),                      //因此這裡為了保證在任務執行失敗時,將柵欄標記為已破壞,喚醒阻塞執行緒                      if (!ranAction)                          breakBarrier();                  }              }                // loop until tripped, broken, interrupted, or timed out              for (;;) {                  try {                      //沒有設置超時標記,就加入等待隊列                      if (!timed)                          trip.await();                      //設置了超時標記,但目前還沒有超時,則繼續等待                      else if (nanos > 0L)                          nanos = trip.awaitNanos(nanos);                  } catch (InterruptedException ie) {                      //如果執行緒等待的過程中被中斷,會執行到這裡                      //g == generation表示當前還在同一個年齡分代中,!g.broker表示當前柵欄狀態沒有被破壞                      if (g == generation && ! g.broken) {                          breakBarrier();                          throw ie;                      } else {                          //上面的條件不滿足,說明:1)g!=generation,說明執行緒執行到這裡時已經換代了                          //2)沒有換代,但是柵欄被破壞了                          //無論哪種情況,都只是簡單地設置一下當前執行緒的中斷狀態                          Thread.currentThread().interrupt();                      }                  }                  //柵欄被破壞,拋出異常                  //注意,在breakBarrier方法中會喚醒所有等待條件的執行緒,這些執行緒會執行到這裡,判斷柵欄已經被破壞,都會拋出異常                  if (g.broken)                      throw new BrokenBarrierException();                  //距離上一次設置g變數的值已經過去很長時間了,在執行過程中generation可能已經發生改變,                  //當前執行緒還是前幾代的,不需要再循環阻塞了,直接返回上一代剩餘需要等待的執行緒數                  //注意:程式碼中breakBarrier方法和nextGeneration方法都會喚醒阻塞的執行緒,但是breakBarrier在上一個判斷就被攔截了,                  //因此走到這裡的有三種情況:                  //a)最後一個執行緒正常執行,柵欄打開導致其他執行緒被喚醒;不屬於當前代的執行緒直接返回,                  //屬於當前代的則可能因為沒到柵欄開放條件要繼續循環阻塞                  //b)柵欄被重置(調用了reset方法),此時g!=negeration,全都直接返回                  //c)執行緒等待超時了,不屬於當前代的返回就可以了,屬於當前代的則要設置generation.broken = true                  if (g != generation)                      return index;                  //如果執行緒等待超時,標記柵欄為破壞狀態並拋出異常,如果還沒超時,則自旋後又重新阻塞                  if (timed && nanos <= 0L) {                      breakBarrier();                      throw new TimeoutException();                  }              }          } finally {              //別忘了解鎖              lock.unlock();          }      }

dowait的方法邏輯是:每一個調用await方法的執行緒都會將計數count1,最後一個執行緒將count減為0時,順帶還要執行barrierCommand指定的任務,並將generation切換到下一代,當然,最重要的還是要喚醒之前在柵欄處阻塞的執行緒。由於trip對應的Condition對象沒有任何地方會修改,因此trip.signalAll()會喚醒所有在該條件上等待的執行緒,如果執行緒在等待的過程中,其他執行緒將generation更新到下一代,就會出現被喚醒的執行緒中有部分還屬於之前那一代的情況。
接下來將會對dowait用到的一些方法進行簡單介紹。

  • breakBarrier
    dowait方法有四個地方調用了breakBarrier,從名字可以看出,該方法會將generation.broken設置為true,除此之外,還會還原count的值,並且喚醒所有被阻塞的執行緒:
    private void breakBarrier() {          generation.broken = true;          count = parties;          //喚醒所有的阻塞執行緒          trip.signalAll();      }

縱觀CyclicBarrier源碼,generation.broken統一在breakBarrier方法中被設置為true,而一旦將generation.broken設置為true之後,程式碼中檢查到這個狀態之後都會拋出異常,柵欄就沒辦法再使用了(可以手動調用reset進行重置),而源碼中會在以下幾種情況調用breakBarrier方法:
1) 當前執行緒被中斷
2)通過構造器傳入的任務執行失敗
3) 條件等待時被中斷
4) 執行緒等待超時
5) 顯式調用reset方法

  • nextGeneration
    private void nextGeneration() {          // 喚醒所有的阻塞執行緒          trip.signalAll();          // 開啟下一代          count = parties;          generation = new Generation();      }
  • reset
    reset方法主要是結束這一代,並切換到下一代
    public void reset() {          final ReentrantLock lock = this.lock;          lock.lock();          try {              breakBarrier();   // break the current generation              nextGeneration(); // start a new generation          } finally {              lock.unlock();          }      }

介紹到這裡,整個CyclicBarrier已經差不多介紹完了,但是內部的流程遠遠沒有這麼簡單,因為很大一部分邏輯封裝在AbstractQueuedSynchronizer中,這個類定義了阻塞的執行緒如何加入等待隊列,又如何被喚醒,因此如果想要深入了解執行緒等待的邏輯,還需要仔細研究AbstractQueuedSynchronizer才行。本文不會對這部分內容進行介紹,後面有時間的話將會專門對其進行介紹。