多線程高並發編程(5) — CountDownLatch、CyclicBarrier源碼分析
一.CountDownLatch
1.概念
public CountDownLatch(int count) {//初始化 if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
CountDownLatch是一個同步工具類,用來協調多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用)。
CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之後,再繼續執行。使用一個計數器進行實現。計數器初始值為線程的數量。當每一個線程完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的線程都已經完成一些任務,然後在CountDownLatch上等待的線程就可以恢復執行接下來的任務。
下面有A、B、C、D4個線程同時執行,A是主線程,B、C、D是子線程,A先開始執行後阻塞,等待子線程全部執行結束才繼續執行剩下的任務。
2.用法
1)、某一線程在開始運行前等待n個線程執行完畢。將CountDownLatch的計數器初始化為new CountDownLatch(n),每當一個任務線程執行完畢,就將計數器減1 countdownLatch.countDown(),當計數器的值變為0時,在CountDownLatch上await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個組件加載完畢,之後再繼續執行。
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(3); final CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("子線程" + Thread.currentThread().getName() + "開始執行"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("子線程"+Thread.currentThread().getName()+"執行完成"); latch.countDown();//當前線程調用此方法,則計數減一 } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { System.out.println("主線程"+Thread.currentThread().getName()+"等待子線程執行完成..."); latch.await();//阻塞當前線程,直到計數器的值為0 System.out.println("主線程"+Thread.currentThread().getName()+"開始執行..."); } catch (InterruptedException e) { e.printStackTrace(); } }
結果:
主線程main等待子線程執行完成…
子線程pool-1-thread-1開始執行
子線程pool-1-thread-3開始執行
子線程pool-1-thread-2開始執行
子線程pool-1-thread-3執行完成
子線程pool-1-thread-1執行完成
子線程pool-1-thread-2執行完成
主線程main開始執行…
2)、實現多個線程開始執行任務的最大並行性。注意是並行性,不是並發,強調的是多個線程在某一時刻同時開始執行。類似於賽跑,將多個線程放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計算器初始化為1,多個線程在開始執行任務前首先countdownlatch.await(),當主線程調用countDown()時,計數器變為0,多個線程同時被喚醒。
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(4); for (int i = 0; i < 4; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發佈口令"); cdOrder.await(); System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("選手" + Thread.currentThread().getName() + "到達終點"); cdAnswer.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("裁判"+Thread.currentThread().getName()+"即將發佈口令"); cdOrder.countDown(); System.out.println("裁判"+Thread.currentThread().getName()+"已發送口令,正在等待所有選手到達終點"); cdAnswer.await(); System.out.println("所有選手都到達終點"); System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名"); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); }
結果:
選手pool-1-thread-2正在等待裁判發佈口令
選手pool-1-thread-1正在等待裁判發佈口令
選手pool-1-thread-3正在等待裁判發佈口令
選手pool-1-thread-4正在等待裁判發佈口令
裁判main即將發佈口令
裁判main已發送口令,正在等待所有選手到達終點
選手pool-1-thread-2已接受裁判口令
選手pool-1-thread-1已接受裁判口令
選手pool-1-thread-3已接受裁判口令
選手pool-1-thread-4已接受裁判口令
選手pool-1-thread-2到達終點
選手pool-1-thread-1到達終點
選手pool-1-thread-4到達終點
選手pool-1-thread-3到達終點
所有選手都到達終點
裁判main匯總成績排名
3.countDown解析
遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。如果當前計數大於零,則將計數減少.
public void countDown() { sync.releaseShared(1); }
countDown調用AQS的releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//數量為0 doReleaseShared();//喚醒其他等待線程 return true; } return false; } protected boolean tryReleaseShared(int arg) {//releaseShared調用,由CountDownLatch的內部類Sync實現 throw new UnsupportedOperationException(); } private static final class Sync extends AbstractQueuedSynchronizer {//CountDownLatch的內部類Sync protected boolean tryReleaseShared(int releases) { for (;;) {//自旋,count不斷-1,直到為0則發起喚醒信號 int c = getState();//獲得數量,在CountDownLatch(int count)初始化時定義了數量 if (c == 0)//數量為0則返回false return false; int nextc = c-1;//數量-1 //CAS更新狀態,nextc為0返回true if (compareAndSetState(c, nextc)) return nextc == 0; } } }
4.await解析
使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。如果當前計數為零,則喚醒阻塞線程。
如果當前計數大於零,則出於線程調度目的,將禁用當前線程,該線程將一直出於休眠狀態;
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);//由AQS實現 } /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * 以共享模式獲取,如果中斷被中止。 * 實現首先檢查中斷狀態,然後至少調用一次tryacquirered,成功返回。 * 否則,線程排隊,可能會重複阻塞和取消阻塞, * 調用tryacquiremred直到成功或線程被打斷了。 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//有中斷拋出異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//CountDownLatch的Sync實現,計數數量不為0,表示有線程需要阻塞 doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //以共享中斷模式獲取 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//加入等待隊列 boolean failed = true; try { for (;;) {//自旋 final Node p = node.predecessor();//獲得前繼節點 if (p == head) {//是頭節點 int r = tryAcquireShared(arg); if (r >= 0) {//子線程都執行完成了,原先阻塞線程喚醒執行 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前繼節點非head節點,將前繼節點狀態設置為SIGNAL,通過park掛起node節點的線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
二.CyclicBarrier
1.概念
public CyclicBarrier(int parties, Runnable barrierAction) {//初始化 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) {//初始化 this(parties, null); }
允許一組線程全部等待彼此達到共同屏障點的同步輔助。 循環阻塞在涉及固定大小的線程方的程序中很有用,這些線程必須偶爾等待彼此。 屏障被稱為循環 ,因為它可以在等待的線程被釋放之後重新使用。
A CyclicBarrier支持一個可選的Runnable命令,每個屏障點運行一次,在派對中的最後一個線程到達之後,但在任何線程釋放之前。 在任何一方繼續進行之前,此屏障操作對更新共享狀態很有用。
CyclicBarrier當計數減少到0時,會喚醒所有阻塞在同一個Condition上的線程,與CountDownLatch不同的是所有的線程必須同時被喚醒,就好比釣魚比賽,所有人必須同時開始拋竿一樣。CountDownLatch只要求主線程的動作在其他依賴的線程執行完之後執行就OK。
下面有A、B、C、D4個線程同時執行,每個線程有任務a、b,每個線程的每個任務執行完才開始繼續下個任務執行。
2.用法
public class CyclicBarrierTest { public static void main(String[] args) { int count = 10;//並發線程數 CyclicBarrier cyclicBarrier = new CyclicBarrier(count); ExecutorService executorService = Executors.newFixedThreadPool(count); int n = 1; for (int i = 0; i < count; i++) { executorService.execute(new Task(cyclicBarrier, n)); n++; } executorService.shutdown(); // 關閉線程池 // 判斷是否所有的線程已經運行完 while (!executorService.isTerminated()) { try { // 所有線程池中的線程執行完畢,執行後續操作 System.out.println(" ==============is sleep============"); Thread.sleep(10000); System.out.println(" ==============is wake============"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Task implements Runnable { private CyclicBarrier cyclicBarrier; int n = 0; public Task(CyclicBarrier cyclicBarrier, int n) { this.cyclicBarrier = cyclicBarrier; this.n = n; } @Override public void run() { try { System.out.println("賽馬" + n + "到達柵欄前"); cyclicBarrier.await(); System.out.println("賽馬" + n + "開始跑"); cyclicBarrier.await(); System.out.println("賽馬" + n + "到達終點"); } catch (Exception e) { e.printStackTrace(); } } }
結果:
==============is sleep============
賽馬2到達柵欄前
賽馬3到達柵欄前
賽馬4到達柵欄前
賽馬1到達柵欄前
賽馬5到達柵欄前
賽馬6到達柵欄前
賽馬7到達柵欄前
賽馬8到達柵欄前
賽馬9到達柵欄前
賽馬10到達柵欄前
賽馬10開始跑
賽馬3開始跑
賽馬2開始跑
賽馬4開始跑
賽馬1開始跑
賽馬6開始跑
賽馬7開始跑
賽馬5開始跑
賽馬9開始跑
賽馬8開始跑
賽馬8到達終點
賽馬2到達終點
賽馬3到達終點
賽馬4到達終點
賽馬7到達終點
賽馬10到達終點
賽馬5到達終點
賽馬6到達終點
賽馬1到達終點
賽馬9到達終點
==============is wake============
3.await解析
如果當前線程不是最後一個線程,那麼它被禁用以進行線程調度,並且處於休眠狀態,直到發生下列事情之一:
- 最後一個線程到達; 要麼
- 一些其他線程當前線程為interrupts ; 要麼
- 一些其他線程interrupts其他等待線程之一; 要麼
- 一些其他線程在等待屏障時超時; 要麼
- 其他一些線程在這個屏障上調用
reset()
。
CyclicBarrier的原理:在CyclicBarrier的內部定義了一個Lock對象,每當一個線程調用await方法時,將攔截的線程數減1,然後判斷剩餘攔截數是否為初始值parties,如果不是,進入Lock對象的條件隊列等待。如果是,執行barrierAction對象的Runnable方法,然後將鎖的條件隊列中的所有線程放入鎖等待隊列中,這些線程會依次的獲取鎖、釋放鎖。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock;//獲鎖 lock.lock();//加鎖 try { //當代,每個屏障都會創建一個Generation實例 final Generation g = generation; if (g.broken)//當代遭到破壞拋出異常 throw new BrokenBarrierException(); if (Thread.interrupted()) {//線程中斷拋出異常 // 將損壞狀態設置為true,並通知其他阻塞在此柵欄上的線程 breakBarrier(); throw new InterruptedException(); } int index = --count;//獲取下標並-1 if (index == 0) { //最後一個線程到達了 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();//執行柵欄任務 ranAction = true; nextGeneration();// 更新一代,將count重置,將generation重置,喚醒之前等待的線程 return 0; } finally { // 如果執行柵欄任務的時候失敗了,就將損壞狀態設置為true if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed)//如果沒有時間限制,直接等待直到被喚醒 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//等待指定時間 } catch (InterruptedException ie) { if (g == generation && ! g.broken) {//當代沒有損壞 breakBarrier();//讓柵欄失效 throw ie; } else {// 上麵條件不滿足,說明這個線程不是這代的,就不會影響當前這代柵欄的執行,所以,就打個中斷標記 Thread.currentThread().interrupt(); } } // 當有任何一個線程中斷了,就會調用breakBarrier方法, //就會喚醒其他的線程,其他線程醒來後,也要拋出異常 if (g.broken) throw new BrokenBarrierException(); // g != generation表示正常換代了,返回當前線程所在柵欄的下標 // 如果 g == generation,說明還沒有換代,那為什麼會醒了? // 因為一個線程可以使用多個柵欄,當別的柵欄喚醒了這個線程,就會走到這裡,所以需要判斷是否是當前代。 // 正是因為這個原因,才需要generation來保證正確。 if (g != generation) return index; // 如果有時間限制,且時間小於等於0,銷毀柵欄並拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
dowait(boolean, long)方法的主要邏輯處理比較簡單,如果該線程不是最後一個調用await方法的線程,則它會一直處於等待狀態,除非發生以下情況:
-
最後一個線程到達,即index == 0
-
某個參與線程等待超時
-
某個參與線程被中斷
-
調用了CyclicBarrier的reset()方法。該方法會將屏障重置為初始狀態
在上面的源代碼中,我們可能需要注意Generation 對象,在上述代碼中我們總是可以看到拋出BrokenBarrierException異常,那麼什麼時候拋出異常呢?如果一個線程處於等待狀態時,如果其他線程調用reset(),或者調用的barrier原本就是被損壞的,則拋出BrokenBarrierException異常。同時,任何線程在等待時被中斷了,則其他所有線程都將拋出BrokenBarrierException異常,並將barrier置於損壞狀態。
同時,Generation描述着CyclicBarrier的更新換代。在CyclicBarrier中,同一批線程屬於同一代。當有parties個線程到達barrier之後,generation就會被更新換代。其中broken標識該當前CyclicBarrier是否已經處於中斷狀態。
注意事項:
-
CyclicBarrier使用獨佔鎖來執行await方法,並發性可能不是很高。
-
如果在等待過程中,線程被中斷了,就拋出異常。但如果中斷的線程所對應的CyclicBarrier不是這代的,比如,在最後一次線程執行signalAll後,並且更新了這個「代」對象。在這個區間,這個線程被中斷了,那麼,JDK認為任務已經完成了,就不必在乎中斷了,只需要打個標記。該部分源碼已在dowait(boolean, long)方法中進行了注釋。
-
如果線程被其他的CyclicBarrier喚醒了,那麼g肯定等於generation,這個事件就不能return了,而是繼續循環阻塞。反之,如果是當前CyclicBarrier喚醒的,就返回線程在CyclicBarrier的下標。完成了一次衝過柵欄的過程。該部分源碼已在dowait(boolean, long)方法中進行了注釋。
參考:
CyclicBarrier://blog.csdn.net/qq_38293564/article/details/80558157