源碼分析:CyclicBarrier 之循環柵欄
簡介
CyclicBarrier 是一個同步輔助工具,允許一組線程全部等待彼此達到共同屏障點,且等待的線程被釋放後還可以重新使用,所以叫做Cyclic(循環的)。
應用場景
比如出去旅行時,導遊需要等待所有的客人到齊後,導遊才會給大家講解注意事項等
官方示例
在JDK的源碼注釋中,提供了一個簡單的示例demo,稍加修改後就可以運行
public class Solver {
AtomicInteger sum = new AtomicInteger(0);
// 自己新增的一個標識,true代表所有的計算完成了
volatile boolean done = false;
final int N;
final int[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) {
myRow = row;
}
@Override
public void run() {
while (!done()) {
int rowSum = Arrays.stream(data[myRow]).sum(); // 計算行的和
System.out.println("processRow(myRow):" + rowSum);
sum.addAndGet(rowSum);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
private boolean done(){
return done;
}
public Solver(int[][] matrix) throws InterruptedException{
data = matrix;
N = matrix.length;
Runnable barrierAction = () -> {
System.out.println("mergeRows(...):"+sum.get()); // 輸出二維數組的總和
done = true;
};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads){
thread.join();
}
}
public static void main(String[] args) throws InterruptedException{
int[][] matrix = {{1,2,3},{4,5,6}};
Solver solver = new Solver(matrix);
}
}
源碼分析
主要的屬性
/** 防護柵欄入口的鎖 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待直到跳閘的條件 */
private final Condition trip = lock.newCondition();
/** 構造方法參數,在障礙被釋放之前必須調用等待的線程數 */
private final int parties;
/* 越過柵欄時運行的命令 */
private final Runnable barrierCommand;
/** 當前的一代,控制CyclicBarrier的循環 */
private Generation generation = new Generation();
/** 記錄仍在等待的參與方線程數量,初始值等於parties */
private int count;
主要內部類
/** 代:屏障的每次使用都表示為一個生成實例 */
private static class Generation {
boolean broken = false; // 標識當前的柵欄已破壞或喚醒,jinglingwang.cn
}
構造方法
一共有兩個構造方法,第一個構造方法僅需要傳入一個int值,表示調用等待的線程數;第二個構造方法多了一個runnable接口,當所有的線程越過柵欄時執行的命令,沒有則為null;
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction; // Runnable 命令線程
}
await() 方法
每個需要在柵欄處等待的線程都需要顯式地調用這個方法。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 調用await方法,0:不超時
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait() 方法
主要的障礙代碼
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 當前鎖
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 當前代
final Generation g = generation;
// 檢查當前代的狀態,是否要拋出BrokenBarrierException異常
if (g.broken)
throw new BrokenBarrierException();
// 當前線程被中斷了
if (Thread.interrupted()) {
// 屏障被打破
breakBarrier();
throw new InterruptedException();
}
// count減一
int index = --count;
// index等於0,說明最後一個線程到達了屏障處
if (index == 0) { // tripped
boolean ranAction = false; // 標識Runnable 命令線程是否有執行
try {
final Runnable command = barrierCommand; // 第二個構造方法的入參,需要運行的命令線程
if (command != null)
command.run(); // 執行命令線程。by:jinglingwang.cn
ranAction = true;
nextGeneration(); // 更新重置整個屏障
return 0;
} finally {
if (!ranAction)
// ranAction 沒有被設置成true;被中斷了
breakBarrier();
}
}
// 循環直到跳閘,斷開,中斷或超時
for (;;) {
try {
if (!timed) // 沒有設超時時間,直接調用條件鎖的await方法阻塞等待
trip.await();
else if (nanos > 0L) // 有超時時間
nanos = trip.awaitNanos(nanos); //調用條件鎖的await方法阻塞等待一段時間
} catch (InterruptedException ie) { // 捕獲中斷異常
if (g == generation && ! g.broken) {
breakBarrier(); //被中斷,當前代會被標識成已被破壞
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果上面代碼沒有異常,理論上只有被喚醒後才會執行到下面的代碼
// 再次檢查當前代是否已經被破壞
if (g.broken)
throw new BrokenBarrierException();
// 正常來說,最後一個線程在執行上面的代碼時,會調用nextGeneration,重新生成generation
// 所以線程被喚醒後,這裡條件會成立
if (g != generation)
return index;
// 超時檢查
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException(); //拋出超時異常
}
}
} finally {
// 釋放鎖
lock.unlock();
}
}
/** 重置屏障,回到初始狀態,說明可以重複使用*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties; // 重置等的參與方線程數量計數,回到最初的狀態
generation = new Generation();
}
private void breakBarrier() {
// 標識當前的柵欄狀態
generation.broken = true;
count = parties;
// 條件鎖,喚醒所有等待的線程,jinglingwang.cn
trip.signalAll();
}
dowait() 方法過程總結:
- 參與方的多個線程執行邏輯代碼後,分別調用
await
方法 - 線程分別拿到當前鎖,最先獲得鎖的N-1個線程,調用條件鎖
Condition
的await
方法,根據前面條件鎖的源碼分析我們知道,調用條件鎖的await方法會釋放當前鎖,然後再調用Unsafa類底層park
阻塞線程。 - 當最後一個線程調用await方法時(也就是上面的 if (index == 0) 分支邏輯,count減為0,屏障打破),會執行命令線程(構造方法的第二個入參Runnable),然後調用
nextGeneration
方法,喚醒所有的條件鎖等待的N-1個線程(喚醒並不一定馬上執行),然後重置計數與當前代,也就是一個新的屏障了,這也就是為什麼可以重複使用的原因。 - 最後一個線程釋放鎖,N-1線程中的線程陸續獲得鎖,釋放鎖,完成整個流程
CyclicBarrier 總結
- 支持兩個構造參數:線程數和需要執行的命令線程
- CyclicBarrier 是基於ReentrantLock和Condition來實現屏障邏輯的
- 先搶到鎖的N-1個線程會調用條件鎖的await方法從而被阻塞
- 最後一個獲得鎖的線程來喚醒之前的N-1個線程以及來調用命令線程的run方法
- 最後一個獲得鎖的線程會生成一個新的屏障(new Generation()),也就是可以重複使用的屏障
- 如果線程中有一個線程被中斷,整個屏障被破壞後,所有線程都可能拋出BrokenBarrierException異常
- 原文首發地址://jinglingwang.cn/archives/cyclicbarrier
CyclicBarrier 與CountDownLatch的區別
- CyclicBarrier 是基於重入鎖和條件鎖來實現的
- CountDownLatch 是基於AQS的同步功能來實現的
- CyclicBarrier 不允許0個線程,會拋出異常
- CountDownLatch 允許0個線程,雖然沒什麼*用
- CyclicBarrier 阻塞的是N-1個線程,需要每個線程調用await,之後由最後一個線程來喚醒所有的等待線程,這也就是屏障的意思
- CountDownLatch 是計數為N,阻塞的不一定是N個線程(可以是一個或多個),由線程顯示調用countDown方法來減計數,計數為0時,喚醒阻塞的一個線程或多個線程
- CyclicBarrier 最後一個線程會重置屏障的參數,生成一個新的Generation,可以重複使用,不需要重新new CyclicBarrier
- CountDownLatch 沒有重置計數的地方,計數為0後不可以重複使用,需要重新new CountDownLatch 才可以再次使用