CyclicBarrier是如何成為一個"柵欄"的
- 2020 年 4 月 9 日
- 筆記
CyclicBarrier
是一種類似於柵欄的存在,意思就是在柵欄開放之前你都只能被擋在柵欄的一側,當柵欄移除之後,之前被擋在一側的多個對象則同時開始動起來。
1. 如何使用CyclicBarrier
在介紹其原理之前,先了解一下CyclicBarrier
應該如何使用。
假設現在有這樣的場景,我們需要開一個會議,需要張1、張2、張3三個人參加,
會議需要三個人都到齊之後才能開始,否則只能幹等著;這個場景用CyclicBarrier
可以很契合的模擬出來。程式碼如下:
public static void main(String[] args) { // 執行緒池,每個執行緒代表一個人 ThreadPoolExecutor executor = ThreadPoolProvider.getInstance(); // 會議所需的人數為3 CyclicBarrier barrier = new CyclicBarrier(3); executor.execute(() -> { try { System.err.println("張1到達會議室"); barrier.await(); System.err.println("會議開始,張1開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張2到達會議室"); barrier.await(); System.err.println("會議開始,張2開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張3先去個廁所,內急解決再去開會"); TimeUnit.SECONDS.sleep(1); System.err.println("張3到達會議室"); barrier.await(); System.err.println("會議開始,張3開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.shutdown(); }
結果圖:
通過上方程式碼可以知道CyclicBarrier
的幾點:
- 使用
await()
來表示完成了某些事情。(上方例子的表現為到達了會議室) - 使用
await()
之後當前執行緒就進入阻塞狀態,需要等待完全滿足CyclicBarrier
的條件後喚醒才能繼續接下來的操作。(上方例子中 為3個人都到達會議室) - 在最後一個執行緒達到條件之後,之前阻塞的執行緒全部放開,繼續接下來的操作。(上方例子為張3到達會議室)
這個簡單的例子也讓我們了解CyclicBarrier
的使用方法,那來看看其內部究竟是如何實現柵欄的效果的。
2. CyclicBarrier是如何成為"柵欄"的
從第一節的程式碼中我們也能看到,需要關注的就兩個地方
- 構造函數
- await()方法
只要了解這兩個方法的內部,相當於了解了CyclicBarrier
的內部。
那在深入了解之前,先來看下CyclicBarrier
的幾個變數,不用刻意去記,看程式碼的時候知道這個東西做什麼用的就行了:
lock:
CyclicBarrier
類創建的ReentrantLock
實例,關於ReentrantLock
不清楚的可以->傳送。trip:
lock
中的condition
,CyclicBarrier
使用該變數來實現各執行緒之間的阻塞和同時喚醒。同樣,不明白condition
作用的=>傳送門。parties:需要滿足條件(調用
await
方法)的總數,就是說當有parties個執行緒await()之後就會喚醒全部執行緒。barrierCommand:一個
Runnable
變數,在await
方法的調用次數到達總數parties
之後,在喚醒全部執行緒之前執行其run()
方法generation:其內部類,可以理解為周期,周期內需要完成n個任務,只要一個任務失敗,當前周期的所有任務就算失敗,結束當前周期,再開啟下個周期。
count:當前周期剩餘需要完成的任務數(剩餘調用
await
方法的次數)
以下為源碼:
public class CyclicBarrier { // 內部類,可理解為周期 private static class Generation { // 當前周期是否失敗 boolean broken = false; } // 鎖的實例 private final ReentrantLock lock = new ReentrantLock(); // ReentrantLock的condition變數,用來控制執行緒喚醒和阻塞 private final Condition trip = lock.newCondition(); // 需要滿足條件的次數,即需要調用await方法的次數 private final int parties; // 滿足條件次數達到parties之後,喚醒所有執行緒之前執行其 run()方法 private final Runnable barrierCommand; // 當前周期 private Generation generation = new Generation(); // 剩餘滿足條件次數 private int count; // ... }
看完CyclicBarrier
的幾個變數後,來看其具體的內部實現。
首先來看構造函數,其構造函數有兩個,一個在達到條件總數(parties)後直接叫醒所有執行緒;另一個指定一個Runnable
在達到條件總數後先執行其run()方法再叫醒。
- 不指定
Runnable
,參數只有一個:需要達成的任務數
public CyclicBarrier(int parties) { // 直接調用另一個構造方法,Runnable傳null,表示不執行 this(parties, null); }
- 指定
Runnable
的構造方法,賦值任務總數、剩餘任務數、喚醒操作之前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // 任務總數 this.parties = parties; // 剩餘需要完成的任務數 this.count = parties; // 喚醒之前執行的Runnable this.barrierCommand = barrierAction; }
在第一節我們使用的是第一個構造方法,來試試第二個
public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = ThreadPoolProvider.getInstance(); /** =======增加Runnable,其他地方保持一致=============*/ CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在會議開始之前,先給大家發下開會資料")); executor.execute(() -> { try { System.err.println("張1到達會議室"); barrier.await(); System.err.println("會議開始,張1開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張2到達會議室"); barrier.await(); System.err.println("會議開始,張2開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.execute(() -> { try { System.err.println("張3先去個廁所,內急解決再去開會"); TimeUnit.SECONDS.sleep(1); System.err.println("張3到達會議室"); barrier.await(); System.err.println("會議開始,張3開始發言"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executor.shutdown(); }
結果圖:
看完構造函數,就算理解了一半CyclicBarrier
了,接下來來看另一半——await()
;跟蹤程式碼,看到是這樣的
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
直接調用dowait
方法,傳參為false
和0,意思就是不限時等待,除非執行緒被打斷或者喚醒。再進入dowait
方法,這個方法就是CyclicBarrier
的另一半,在下方的程式碼中很清楚的寫了整個執行流程
/** 參數說明, timed:是否限時, nanos:限時時間*/ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 鎖 final ReentrantLock lock = this.lock; // 獲取鎖,如果失敗的話執行緒睡眠,進入同步隊列(AQS中的知識) lock.lock(); try { /* 拿到鎖之後進入程式碼處理邏輯*/ // 當前周期 final Generation g = generation; // 如果當前周期是失敗的,那麼直接拋錯 if (g.broken) throw new BrokenBarrierException(); // 如果當前執行緒被打斷了,那麼此次周期失敗,設置相關參數,然後拋錯 if (Thread.interrupted()) { // 實現程式碼在下行的注釋中,設置相關參數來提醒其他執行緒周期失敗了 breakBarrier(); /* * private void breakBarrier() { * generation.broken = true; * count = parties; * // 喚醒condition中的所有執行緒 * trip.signalAll(); * } */ throw new InterruptedException(); } // 如果成功了,那麼剩餘任務數(count)減1 int index = --count; // 如果為0則表示達到剩餘的任務數沒有了,達到CyclicBarrier的條件總數了,需要喚醒其他執行緒 if (index == 0) { boolean ranAction = false; try { // 喚醒之前的Runnable final Runnable command = barrierCommand; // 如果不為空的話執行其run方法 if (command != null) command.run(); ranAction = true; // 開啟下個周期,這個方法是CyclicBarrier可以復用的原因,具體實現在下行注釋 nextGeneration(); /* private void nextGeneration() { * // 首先叫醒當前周期的其他執行緒,告訴其周期結束了,可以執行接下來的操作了 * trip.signalAll(); * // 然後開啟下個周期,剩餘任務數重置 * count = parties; * // 下個周期 * generation = new Generation(); * } */ return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果還不能結束本周期,就一直等待直到結束或者周期失敗 for (;;) { try { // await的過程中是釋放鎖的 // 不限時的話就一直等待直到被喚醒或者打斷 if (!timed) trip.await(); else if (nanos > 0L) // 否則的話等待一段時間後醒來 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
到這裡就基本理解CyclicBarrier
的內部實現了,其他像帶參數的await
也是一樣邏輯,只不過是多了限時的條件而已。
其實如果你了解ReentrantLock
的話,就知道CyclicBarrier
整個就是對ReentrantLock
的condition
的活用而已。
3.總結
整體來說CyclicBarrier
的實現相對較簡單,說是ReentrantLock
中condition
的升級版也不為過。其關鍵點為兩個,一個為其構造函數,決定任務個數和喚醒前操作;另外一個點為await
方法,在正常情況下每次await
都會減少一個任務數(總數由構造方法決定),在任務數變為0的時候表示周期結束,需要喚醒condition
的其他執行緒,而途中遇到失敗的話當前周期失敗,喚醒其他執行緒一起拋錯。
失敗不會讓你變得弱小,害怕失敗會。