CyclicBarrier是如何成為一個"柵欄"的

CyclicBarrier是一種類似於柵欄的存在,意思就是在柵欄開放之前你都只能被擋在柵欄的一側,當柵欄移除之後,之前被擋在一側的多個對象則同時開始動起來。

1. 如何使用CyclicBarrier

  在介紹其原理之前,先了解一下CyclicBarrier應該如何使用。

  假設現在有這樣的場景,我們需要開一個會議,需要張1、張2、張3三個人參加,
會議需要三個人都到齊之後才能開始,否則只能幹等著;這個場景用CyclicBarrier可以很契合的模擬出來。程式碼如下:

public static void main(String[] args) {      // 執行緒池,每個執行緒代表一個人      ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();      // 會議所需的人數為3      CyclicBarrier barrier = new CyclicBarrier(3);        executor.execute(() -> {          try {              System.err.println("張1到達會議室");              barrier.await();              System.err.println("會議開始,張1開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });        executor.execute(() -> {          try {              System.err.println("張2到達會議室");              barrier.await();              System.err.println("會議開始,張2開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });        executor.execute(() -> {          try {              System.err.println("張3先去個廁所,內急解決再去開會");              TimeUnit.SECONDS.sleep(1);              System.err.println("張3到達會議室");              barrier.await();              System.err.println("會議開始,張3開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });          executor.shutdown();  }  

結果圖:
例子圖
  通過上方程式碼可以知道CyclicBarrier的幾點:

  1. 使用await()來表示完成了某些事情。(上方例子的表現為到達了會議室)
  2. 使用await()之後當前執行緒就進入阻塞狀態,需要等待完全滿足CyclicBarrier的條件後喚醒才能繼續接下來的操作。(上方例子中 為3個人都到達會議室)
  3. 在最後一個執行緒達到條件之後,之前阻塞的執行緒全部放開,繼續接下來的操作。(上方例子為張3到達會議室)

  這個簡單的例子也讓我們了解CyclicBarrier的使用方法,那來看看其內部究竟是如何實現柵欄的效果的。

2. CyclicBarrier是如何成為"柵欄"的

  從第一節的程式碼中我們也能看到,需要關注的就兩個地方

  1. 構造函數
  2. await()方法

只要了解這兩個方法的內部,相當於了解了CyclicBarrier的內部。
那在深入了解之前,先來看下CyclicBarrier的幾個變數,不用刻意去記,看程式碼的時候知道這個東西做什麼用的就行了:

lock:CyclicBarrier類創建的ReentrantLock實例,關於ReentrantLock不清楚的可以->傳送。

trip:lock中的conditionCyclicBarrier使用該變數來實現各執行緒之間的阻塞和同時喚醒。同樣,不明白condition作用的=>傳送門

parties:需要滿足條件(調用await方法)的總數,就是說當有parties個執行緒await()之後就會喚醒全部執行緒。

barrierCommand:一個Runnable變數,在await方法的調用次數到達總數parties之後,在喚醒全部執行緒之前執行其run()方法

generation:其內部類,可以理解為周期,周期內需要完成n個任務,只要一個任務失敗,當前周期的所有任務就算失敗,結束當前周期,再開啟下個周期。

count:當前周期剩餘需要完成的任務數(剩餘調用await方法的次數)

以下為源碼:

public class CyclicBarrier {      // 內部類,可理解為周期      private static class Generation {          // 當前周期是否失敗          boolean broken = false;      }        // 鎖的實例      private final ReentrantLock lock = new ReentrantLock();      // ReentrantLock的condition變數,用來控制執行緒喚醒和阻塞      private final Condition trip = lock.newCondition();      // 需要滿足條件的次數,即需要調用await方法的次數      private final int parties;      // 滿足條件次數達到parties之後,喚醒所有執行緒之前執行其 run()方法      private final Runnable barrierCommand;      // 當前周期      private Generation generation = new Generation();      // 剩餘滿足條件次數      private int count;        // ...  }  

  看完CyclicBarrier的幾個變數後,來看其具體的內部實現。

  首先來看構造函數,其構造函數有兩個,一個在達到條件總數(parties)後直接叫醒所有執行緒;另一個指定一個Runnable在達到條件總數後先執行其run()方法再叫醒。

  • 不指定Runnable,參數只有一個:需要達成的任務數
public CyclicBarrier(int parties) {      // 直接調用另一個構造方法,Runnable傳null,表示不執行      this(parties, null);  }  
  • 指定Runnable的構造方法,賦值任務總數、剩餘任務數、喚醒操作之前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {      if (parties <= 0) throw new IllegalArgumentException();      // 任務總數      this.parties = parties;      // 剩餘需要完成的任務數      this.count = parties;      // 喚醒之前執行的Runnable      this.barrierCommand = barrierAction;  }  

  在第一節我們使用的是第一個構造方法,來試試第二個

public static void main(String[] args) throws InterruptedException {        ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();      /** =======增加Runnable,其他地方保持一致=============*/      CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在會議開始之前,先給大家發下開會資料"));        executor.execute(() -> {          try {              System.err.println("張1到達會議室");              barrier.await();              System.err.println("會議開始,張1開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });        executor.execute(() -> {          try {              System.err.println("張2到達會議室");              barrier.await();              System.err.println("會議開始,張2開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });        executor.execute(() -> {          try {              System.err.println("張3先去個廁所,內急解決再去開會");              TimeUnit.SECONDS.sleep(1);              System.err.println("張3到達會議室");              barrier.await();              System.err.println("會議開始,張3開始發言");          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }        });          executor.shutdown();  }  

結果圖:

pic2

 看完構造函數,就算理解了一半CyclicBarrier了,接下來來看另一半——await();跟蹤程式碼,看到是這樣的

public int await() throws InterruptedException, BrokenBarrierException {      try {          return dowait(false, 0L);      } catch (TimeoutException toe) {          throw new Error(toe); // cannot happen      }  }  

直接調用dowait方法,傳參為false0,意思就是不限時等待,除非執行緒被打斷或者喚醒。再進入dowait方法,這個方法就是CyclicBarrier的另一半,在下方的程式碼中很清楚的寫了整個執行流程

/** 參數說明, timed:是否限時, nanos:限時時間*/  private int dowait(boolean timed, long nanos)          throws InterruptedException, BrokenBarrierException, TimeoutException {      // 鎖      final ReentrantLock lock = this.lock;      // 獲取鎖,如果失敗的話執行緒睡眠,進入同步隊列(AQS中的知識)      lock.lock();      try {          /* 拿到鎖之後進入程式碼處理邏輯*/            // 當前周期          final Generation g = generation;            // 如果當前周期是失敗的,那麼直接拋錯          if (g.broken)              throw new BrokenBarrierException();            // 如果當前執行緒被打斷了,那麼此次周期失敗,設置相關參數,然後拋錯          if (Thread.interrupted()) {              // 實現程式碼在下行的注釋中,設置相關參數來提醒其他執行緒周期失敗了              breakBarrier();              /*               * private void breakBarrier() {               *     generation.broken = true;               *     count = parties;               *     // 喚醒condition中的所有執行緒               *     trip.signalAll();               * }               */              throw new InterruptedException();          }            // 如果成功了,那麼剩餘任務數(count)減1          int index = --count;          // 如果為0則表示達到剩餘的任務數沒有了,達到CyclicBarrier的條件總數了,需要喚醒其他執行緒          if (index == 0) {              boolean ranAction = false;              try {                  // 喚醒之前的Runnable                  final Runnable command = barrierCommand;                  // 如果不為空的話執行其run方法                  if (command != null)                      command.run();                  ranAction = true;                  // 開啟下個周期,這個方法是CyclicBarrier可以復用的原因,具體實現在下行注釋                  nextGeneration();                  /* private void nextGeneration() {                   *     // 首先叫醒當前周期的其他執行緒,告訴其周期結束了,可以執行接下來的操作了                   *     trip.signalAll();                   *     // 然後開啟下個周期,剩餘任務數重置                   *     count = parties;                   *     // 下個周期                   *     generation = new Generation();                   * }                   */                  return 0;              } finally {                  if (!ranAction)                      breakBarrier();              }          }            // 如果還不能結束本周期,就一直等待直到結束或者周期失敗          for (;;) {              try {                  // await的過程中是釋放鎖的                  // 不限時的話就一直等待直到被喚醒或者打斷                  if (!timed)                      trip.await();                  else if (nanos > 0L)                      // 否則的話等待一段時間後醒來                      nanos = trip.awaitNanos(nanos);              } catch (InterruptedException ie) {                  if (g == generation && ! g.broken) {                      breakBarrier();                      throw ie;                  } else {                      // We're about to finish waiting even if we had not                      // been interrupted, so this interrupt is deemed to                      // "belong" to subsequent execution.                      Thread.currentThread().interrupt();                  }              }                if (g.broken)                  throw new BrokenBarrierException();                if (g != generation)                  return index;                if (timed && nanos <= 0L) {                  breakBarrier();                  throw new TimeoutException();              }          }      } finally {          // 釋放鎖          lock.unlock();      }  }  

  到這裡就基本理解CyclicBarrier的內部實現了,其他像帶參數的await也是一樣邏輯,只不過是多了限時的條件而已。

  其實如果你了解ReentrantLock的話,就知道CyclicBarrier整個就是對ReentrantLockcondition的活用而已。

3.總結

  整體來說CyclicBarrier的實現相對較簡單,說是ReentrantLockcondition的升級版也不為過。其關鍵點為兩個,一個為其構造函數,決定任務個數和喚醒前操作;另外一個點為await方法,在正常情況下每次await都會減少一個任務數(總數由構造方法決定),在任務數變為0的時候表示周期結束,需要喚醒condition的其他執行緒,而途中遇到失敗的話當前周期失敗,喚醒其他執行緒一起拋錯。



失敗不會讓你變得弱小,害怕失敗會。