多執行緒高並發編程(5) — CountDownLatch、CyclicBarrier源碼分析

一.CountDownLatch

  1.概念

    public CountDownLatch(int count) {//初始化
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

  CountDownLatch是一個同步工具類,用來協調多個執行緒之間的同步,或者說起到執行緒之間的通訊(而不是用作互斥的作用)。

  CountDownLatch能夠使一個執行緒在等待另外一些執行緒完成各自工作之後,再繼續執行。使用一個計數器進行實現。計數器初始值為執行緒的數量。當每一個執行緒完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的執行緒都已經完成一些任務,然後在CountDownLatch上等待的執行緒就可以恢復執行接下來的任務。

  下面有A、B、C、D4個執行緒同時執行,A是主執行緒,B、C、D是子執行緒,A先開始執行後阻塞,等待子執行緒全部執行結束才繼續執行剩下的任務。

  2.用法

  1)、某一執行緒在開始運行前等待n個執行緒執行完畢。將CountDownLatch的計數器初始化為new CountDownLatch(n),每當一個任務執行緒執行完畢,就將計數器減1 countdownLatch.countDown(),當計數器的值變為0時,在CountDownLatch上await()的執行緒就會被喚醒。一個典型應用場景就是啟動一個服務時,主執行緒需要等待多個組件載入完畢,之後再繼續執行。

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("子執行緒" + Thread.currentThread().getName() + "開始執行");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("子執行緒"+Thread.currentThread().getName()+"執行完成");
                        latch.countDown();//當前執行緒調用此方法,則計數減一
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }

        try {
            System.out.println("主執行緒"+Thread.currentThread().getName()+"等待子執行緒執行完成...");
            latch.await();//阻塞當前執行緒,直到計數器的值為0
            System.out.println("主執行緒"+Thread.currentThread().getName()+"開始執行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


結果:

主執行緒main等待子執行緒執行完成…
子執行緒pool-1-thread-1開始執行
子執行緒pool-1-thread-3開始執行
子執行緒pool-1-thread-2開始執行
子執行緒pool-1-thread-3執行完成
子執行緒pool-1-thread-1執行完成
子執行緒pool-1-thread-2執行完成
主執行緒main開始執行…

  2)、實現多個執行緒開始執行任務的最大並行性。注意是並行性,不是並發,強調的是多個執行緒在某一時刻同時開始執行。類似於賽跑,將多個執行緒放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計算器初始化為1,多個執行緒在開始執行任務前首先countdownlatch.await(),當主執行緒調用countDown()時,計數器變為0,多個執行緒同時被喚醒。

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1);
        final CountDownLatch cdAnswer = new CountDownLatch(4);
        for (int i = 0; i < 4; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發布口令");
                        cdOrder.await();
                        System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("選手" + Thread.currentThread().getName() + "到達終點");
                        cdAnswer.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        try {
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println("裁判"+Thread.currentThread().getName()+"即將發布口令");
            cdOrder.countDown();
            System.out.println("裁判"+Thread.currentThread().getName()+"已發送口令,正在等待所有選手到達終點");
            cdAnswer.await();
            System.out.println("所有選手都到達終點");
            System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

結果:

選手pool-1-thread-2正在等待裁判發布口令
選手pool-1-thread-1正在等待裁判發布口令
選手pool-1-thread-3正在等待裁判發布口令
選手pool-1-thread-4正在等待裁判發布口令
裁判main即將發布口令
裁判main已發送口令,正在等待所有選手到達終點
選手pool-1-thread-2已接受裁判口令
選手pool-1-thread-1已接受裁判口令
選手pool-1-thread-3已接受裁判口令
選手pool-1-thread-4已接受裁判口令
選手pool-1-thread-2到達終點
選手pool-1-thread-1到達終點
選手pool-1-thread-4到達終點
選手pool-1-thread-3到達終點
所有選手都到達終點
裁判main匯總成績排名

  3.countDown解析

  遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒。如果當前計數大於零,則將計數減少.

    public void countDown() {
        sync.releaseShared(1);
    }

  countDown調用AQS的releaseShared

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//數量為0
            doReleaseShared();//喚醒其他等待執行緒
            return true;
        }
        return false;
    }
    protected boolean tryReleaseShared(int arg) {//releaseShared調用,由CountDownLatch的內部類Sync實現
        throw new UnsupportedOperationException();
    }
    private static final class Sync extends AbstractQueuedSynchronizer {//CountDownLatch的內部類Sync
        protected boolean tryReleaseShared(int releases) {
            for (;;) {//自旋,count不斷-1,直到為0則發起喚醒訊號
                int c = getState();//獲得數量,在CountDownLatch(int count)初始化時定義了數量
                if (c == 0)//數量為0則返回false
                    return false;
                int nextc = c-1;//數量-1
                //CAS更新狀態,nextc為0返回true
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

  4.await解析

  使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。如果當前計數為零,則喚醒阻塞執行緒。

  如果當前計數大於零,則出於執行緒調度目的,將禁用當前執行緒,該執行緒將一直出於休眠狀態;

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//由AQS實現
    }
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * 以共享模式獲取,如果中斷被中止。
     * 實現首先檢查中斷狀態,然後至少調用一次tryacquirered,成功返回。
     * 否則,執行緒排隊,可能會重複阻塞和取消阻塞,
     * 調用tryacquiremred直到成功或執行緒被打斷了。
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//有中斷拋出異常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//CountDownLatch的Sync實現,計數數量不為0,表示有執行緒需要阻塞
            doAcquireSharedInterruptibly(arg);
    }
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //以共享中斷模式獲取
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//加入等待隊列
        boolean failed = true;
        try {
            for (;;) {//自旋
                final Node p = node.predecessor();//獲得前繼節點
                if (p == head) {//是頭節點
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//子執行緒都執行完成了,原先阻塞執行緒喚醒執行
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //前繼節點非head節點,將前繼節點狀態設置為SIGNAL,通過park掛起node節點的執行緒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

二.CyclicBarrier

  1.概念

    public CyclicBarrier(int parties, Runnable barrierAction) {//初始化
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {//初始化
        this(parties, null);
    }

  允許一組執行緒全部等待彼此達到共同屏障點的同步輔助。 循環阻塞在涉及固定大小的執行緒方的程式中很有用,這些執行緒必須偶爾等待彼此。 屏障被稱為循環 ,因為它可以在等待的執行緒被釋放之後重新使用。 

  A CyclicBarrier支援一個可選的Runnable命令,每個屏障點運行一次,在派對中的最後一個執行緒到達之後,但在任何執行緒釋放之前。 在任何一方繼續進行之前,此屏障操作對更新共享狀態很有用。

  CyclicBarrier當計數減少到0時,會喚醒所有阻塞在同一個Condition上的執行緒,與CountDownLatch不同的是所有的執行緒必須同時被喚醒,就好比釣魚比賽,所有人必須同時開始拋竿一樣。CountDownLatch只要求主執行緒的動作在其他依賴的執行緒執行完之後執行就OK。

  下面有A、B、C、D4個執行緒同時執行,每個執行緒有任務a、b,每個執行緒的每個任務執行完才開始繼續下個任務執行。

  2.用法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        int count = 10;//並發執行緒數
        CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        int n = 1;
        for (int i = 0; i < count; i++) {
            executorService.execute(new Task(cyclicBarrier, n));
            n++;
        }
        executorService.shutdown(); // 關閉執行緒池
        // 判斷是否所有的執行緒已經運行完
        while (!executorService.isTerminated()) {
            try {
                // 所有執行緒池中的執行緒執行完畢,執行後續操作
                System.out.println(" ==============is sleep============");
                Thread.sleep(10000);
                System.out.println(" ==============is wake============");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Task implements Runnable {
    private CyclicBarrier cyclicBarrier;
    int n = 0;

    public Task(CyclicBarrier cyclicBarrier, int n) {
        this.cyclicBarrier = cyclicBarrier;
        this.n = n;
    }

    @Override
    public void run() {
        try {
            System.out.println("賽馬" + n + "到達柵欄前");
            cyclicBarrier.await();
            System.out.println("賽馬" + n + "開始跑");
            cyclicBarrier.await();
            System.out.println("賽馬" + n + "到達終點");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
結果:

==============is sleep============
賽馬2到達柵欄前
賽馬3到達柵欄前
賽馬4到達柵欄前
賽馬1到達柵欄前
賽馬5到達柵欄前
賽馬6到達柵欄前
賽馬7到達柵欄前
賽馬8到達柵欄前
賽馬9到達柵欄前
賽馬10到達柵欄前
賽馬10開始跑
賽馬3開始跑
賽馬2開始跑
賽馬4開始跑
賽馬1開始跑
賽馬6開始跑
賽馬7開始跑
賽馬5開始跑
賽馬9開始跑
賽馬8開始跑
賽馬8到達終點
賽馬2到達終點
賽馬3到達終點
賽馬4到達終點
賽馬7到達終點
賽馬10到達終點
賽馬5到達終點
賽馬6到達終點
賽馬1到達終點
賽馬9到達終點
==============is wake============

 

  3.await解析

  如果當前執行緒不是最後一個執行緒,那麼它被禁用以進行執行緒調度,並且處於休眠狀態,直到發生下列事情之一:

  • 最後一個執行緒到達; 要麼
  • 一些其他執行緒當前執行緒為interrupts ; 要麼
  • 一些其他執行緒interrupts其他等待執行緒之一; 要麼
  • 一些其他執行緒在等待屏障時超時; 要麼
  • 其他一些執行緒在這個屏障上調用reset()

  CyclicBarrier的原理:在CyclicBarrier的內部定義了一個Lock對象,每當一個執行緒調用await方法時,將攔截的執行緒數減1,然後判斷剩餘攔截數是否為初始值parties,如果不是,進入Lock對象的條件隊列等待。如果是,執行barrierAction對象的Runnable方法,然後將鎖的條件隊列中的所有執行緒放入鎖等待隊列中,這些執行緒會依次的獲取鎖、釋放鎖。

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;//獲鎖
            lock.lock();//加鎖
            try {
                //當代,每個屏障都會創建一個Generation實例
                final Generation g = generation;

                if (g.broken)//當代遭到破壞拋出異常
                    throw new BrokenBarrierException();

                if (Thread.interrupted()) {//執行緒中斷拋出異常
                    // 將損壞狀態設置為true,並通知其他阻塞在此柵欄上的執行緒
                    breakBarrier();
                    throw new InterruptedException();
                }

                int index = --count;//獲取下標並-1
                if (index == 0) {  //最後一個執行緒到達了
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();//執行柵欄任務
                        ranAction = true;
                        nextGeneration();// 更新一代,將count重置,將generation重置,喚醒之前等待的執行緒
                        return 0;
                    } finally {
                        // 如果執行柵欄任務的時候失敗了,就將損壞狀態設置為true
                        if (!ranAction)
                            breakBarrier();
                    }
                }

                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)//如果沒有時間限制,直接等待直到被喚醒
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);//等待指定時間
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {//當代沒有損壞
                            breakBarrier();//讓柵欄失效
                            throw ie;
                        } else {// 上麵條件不滿足,說明這個執行緒不是這代的,就不會影響當前這代柵欄的執行,所以,就打個中斷標記
                            Thread.currentThread().interrupt();
                        }
                    }
                    // 當有任何一個執行緒中斷了,就會調用breakBarrier方法,
                    //就會喚醒其他的執行緒,其他執行緒醒來後,也要拋出異常
                    if (g.broken)
                        throw new BrokenBarrierException();
            // g != generation表示正常換代了,返回當前執行緒所在柵欄的下標
            // 如果 g == generation,說明還沒有換代,那為什麼會醒了?
            // 因為一個執行緒可以使用多個柵欄,當別的柵欄喚醒了這個執行緒,就會走到這裡,所以需要判斷是否是當前代。
            // 正是因為這個原因,才需要generation來保證正確。
                    if (g != generation)
                        return index;
                    // 如果有時間限制,且時間小於等於0,銷毀柵欄並拋出異常
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

  dowait(boolean, long)方法的主要邏輯處理比較簡單,如果該執行緒不是最後一個調用await方法的執行緒,則它會一直處於等待狀態,除非發生以下情況:

  • 最後一個執行緒到達,即index == 0

  • 某個參與執行緒等待超時

  • 某個參與執行緒被中斷

  • 調用了CyclicBarrier的reset()方法。該方法會將屏障重置為初始狀態

  在上面的源程式碼中,我們可能需要注意Generation 對象,在上述程式碼中我們總是可以看到拋出BrokenBarrierException異常,那麼什麼時候拋出異常呢?如果一個執行緒處於等待狀態時,如果其他執行緒調用reset(),或者調用的barrier原本就是被損壞的,則拋出BrokenBarrierException異常。同時,任何執行緒在等待時被中斷了,則其他所有執行緒都將拋出BrokenBarrierException異常,並將barrier置於損壞狀態。

  同時,Generation描述著CyclicBarrier的更新換代。在CyclicBarrier中,同一批執行緒屬於同一代。當有parties個執行緒到達barrier之後,generation就會被更新換代。其中broken標識該當前CyclicBarrier是否已經處於中斷狀態。
注意事項:

  • CyclicBarrier使用獨佔鎖來執行await方法,並發性可能不是很高。

  • 如果在等待過程中,執行緒被中斷了,就拋出異常。但如果中斷的執行緒所對應的CyclicBarrier不是這代的,比如,在最後一次執行緒執行signalAll後,並且更新了這個「代」對象。在這個區間,這個執行緒被中斷了,那麼,JDK認為任務已經完成了,就不必在乎中斷了,只需要打個標記。該部分源碼已在dowait(boolean, long)方法中進行了注釋。

  • 如果執行緒被其他的CyclicBarrier喚醒了,那麼g肯定等於generation,這個事件就不能return了,而是繼續循環阻塞。反之,如果是當前CyclicBarrier喚醒的,就返回執行緒在CyclicBarrier的下標。完成了一次衝過柵欄的過程。該部分源碼已在dowait(boolean, long)方法中進行了注釋。

參考:

  CyclicBarrier://blog.csdn.net/qq_38293564/article/details/80558157