阿里一面CyclicBarrier和CountDownLatch的區別是啥
引言
前面一篇文章我們《Java高並發編程基礎三大利器之CountDownLatch》它有一個缺點,就是它的計數器只能夠使用一次,也就是說當計數器(state
)減到為 0
的時候,如果 再有執行緒調用去 await
() 方法,該執行緒會直接通過,不會再起到等待其他執行緒執行結果起到同步的作用。為了解決這個問題CyclicBarrier
就應運而生了。
什麼是CyclicBarrier
CyclicBarrier
是什麼?把它拆開來翻譯就是循環(Cycle
)和屏障(Barrier
)
它的主要作用其實和CountDownLanch
差不多,都是讓一組執行緒到達一個屏障時被阻塞,直到最後一個執行緒到達屏障時,屏障會被打開,所有被屏障阻塞的執行緒才會繼續執行,不過它是可以循環執行的,這是它與CountDownLanch
最大的不同。CountDownLanch
是只有當最後一個執行緒把計數器置為0
的時候,其他阻塞的執行緒才會繼續執行。學習CyclicBarrier
之前建議先去看看這幾篇文章:
如何使用
我們首先先來看下關於使用CyclicBarrier
的一個demo
:比如遊戲中有個關卡的時候,每次進入下一關的時候都需要進行載入一些地圖、特效背景音樂什麼的只有全部載入完了才能夠進行遊戲:
/**demo 來源//blog.csdn.net/lstcui/article/details/107389371
* 公眾號【java金融】
*/
public class CyclicBarrierExample {
static class PreTaskThread implements Runnable {
private String task;
private CyclicBarrier cyclicBarrier;
public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
for (int i = 0; i < 4; i++) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("關卡 %d 的任務 %s 完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本關卡所有的前置任務完成,開始遊戲... ...");
});
new Thread(new PreTaskThread("載入地圖數據", cyclicBarrier)).start();
new Thread(new PreTaskThread("載入人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("載入背景音樂", cyclicBarrier)).start();
}
}
}
輸出結果如下:
我們可以看到每次遊戲開始都會等當前關卡把遊戲的人物模型,地圖數據、背景音樂載入完成後才會開始進行遊戲。並且還是可以循環控制的。
源碼分析
結構組成
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
- lock:用於保護屏障入口的鎖
- trip :達到屏障並且不能放行的執行緒在trip條件變數上等待
- parties :柵欄開啟需要的到達執行緒總數
- barrierCommand:最後一個執行緒到達屏障後執行的回調任務
- generation:這是一個內部類,通過它實現
CyclicBarrier
重複利用,每當await
達到最大次數的時候,就會重新new
一個,表示進入了下一個輪迴。裡面只有一個boolean
型屬性,用來表示當前輪迴是否有執行緒中斷。
主要方法
await
方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//獲取barrier當前的 「代」也就是當前循環
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 每來一個執行緒調用await方法都會進行減1
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// new CyclicBarrier 傳入 的barrierCommand, command.run()這個方法是同步的,如果耗時比較多的話,是否執行的時候需要考慮下是否非同步來執行。
if (command != null)
command.run();
ranAction = true;
// 這個方法1. 喚醒所有阻塞的執行緒,2. 重置下count(count 每來一個執行緒都會進行減1)和generation,以便於下次循環。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 進入if條件,說明是不帶超時的await
if (!timed)
// 當前執行緒會釋放掉lock,然後進入到trip條件隊列的尾部,然後掛起自己,等待被喚醒。
trip.await();
else if (nanos > 0L)
//說明當前執行緒調用await方法時 是指定了 超時時間的!
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//Node節點在 條件隊列內 時 收到中斷訊號時 會拋出中斷異常!
//g == generation 成立,說明當前代並沒有變化。
//! g.broken 當前代如果沒有被打破,那麼當前執行緒就去打破,並且拋出異常..
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
//執行到else有幾種情況?
//1.代發生了變化,這個時候就不需要拋出中斷異常了,因為 代已經更新了,這裡喚醒後就走正常邏輯了..只不過設置下 中斷標記。
//2.代沒有發生變化,但是代被打破了,此時也不用返回中斷異常,執行到下面的時候會拋出 brokenBarrier異常。也記錄下中斷標記位。
Thread.currentThread().interrupt();
}
}
//喚醒後,執行到這裡,有幾種情況?
//1.正常情況,當前barrier開啟了新的一代(trip.signalAll())
//2.當前Generation被打破,此時也會喚醒所有在trip上掛起的執行緒
//3.當前執行緒trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
if (g.broken)
throw new BrokenBarrierException();
//喚醒後,執行到這裡,有幾種情況?
//1.正常情況,當前barrier開啟了新的一代(trip.signalAll())
//2.當前執行緒trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
if (g != generation)
return index;
//喚醒後,執行到這裡,有幾種情況?
//.當前執行緒trip中等待超時,然後主動轉移到 阻塞隊列 然後獲取到鎖 喚醒。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
小結
到了這裡我們是不是可以知道為啥CyclicBarrier
可以進行循環計數?
CyclicBarrier
採用一個內部類Generation
來維護當前循環,每一個await
方法都會存儲當前的generation
,獲取到相同generation
對象的屬於同一組,每當count
的次數耗盡就會重新new
一個Generation
並且重新設置count
的值為parties
,表示進入下一次新的循環。
從這個await
方法我們是不是可以知道只要有一個執行緒被中斷了,當代的 generation
的broken
就會被設置為true
,所以會導致其他的執行緒也會被拋出BrokenBarrierException
。相當於一個失敗其他也必須失敗,感覺有「強一致性「的味道。
總結
CountDownLanch
是為計數器是設置一個值,當多次執行countdown
後,計數器減為0
的時候所有執行緒被喚醒,然後CountDownLanch
失效,只能夠使用一次。CyclicBarrier
是當count
為0
時同樣喚醒全部執行緒,同時會重新設置count
為parties
,重新new
一個generation
來實現重複利用。
結束
- 由於自己才疏學淺,難免會有紕漏,假如你發現了錯誤的地方,還望留言給我指出來,我會對其加以修正。
- 如果你覺得文章還不錯,你的轉發、分享、讚賞、點贊、留言就是對我最大的鼓勵。
- 感謝您的閱讀,十分歡迎並感謝您的關注。
巨人的肩膀摘蘋果
//javajr.cn/
//www.360doc.com/content/20/0812/08/55930996_929792021.shtml
//www.cnblogs.com/xxyyy/p/12958160.html