【JDK】JDK源碼分析-CountDownLatch

  • 2019 年 10 月 3 日
  • 筆記

概述

 

CountDownLatch 是並發包中的一個工具類,它的典型應用場景為:一個線程等待幾個線程執行,待這幾個線程結束後,該線程再繼續執行。

 

簡單起見,可以把它理解為一個倒數的計數器:初始值為線程數,每個線程結束時執行減 1 操作,當計數器減到 0 時等待的線程再繼續執行。

 

代碼分析

 

CountDownLatch 的類簽名和主要方法如下:

public class CountDownLatch {}

常用方法為:await()、await(long, TimeUnit) 和 countDown。其中兩個 await 都是讓當前線程進入等待狀態(獲取資源失敗);而 countDown 方法是將計數器減去 1,當計數器為 0 的時候,那些處於等待狀態的線程會繼續執行(獲取資源成功)。

 

構造器代碼如下:

private final Sync sync;    public CountDownLatch(int count) {      if (count < 0) throw new IllegalArgumentException("count < 0");      this.sync = new Sync(count);  }

構造器(該構造器是唯一的)傳入一個正整數,且初始化了 sync 變量,Sync 是內部的一個嵌套類,繼承自 AQS。

 

await / await(long, TimeUnit):

public void await() throws InterruptedException {      sync.acquireSharedInterruptibly(1);  }    public boolean await(long timeout, TimeUnit unit)      throws InterruptedException {      return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));  }

countDown:

public void countDown() {      sync.releaseShared(1);  }

其中,acquireSharedInterruptibly、tryAcquireSharedNanos 和 releaseShared 都是 AQS 中「共享模式」的方法,具體代碼可參考前文「JDK源碼分析-AbstractQueuedSynchronizer(3)」的分析。

 

嵌套類 Sync 代碼如下:

private static final class Sync extends AbstractQueuedSynchronizer {      private static final long serialVersionUID = 4982264981922014374L;        // 構造器,初始化 AQS 的 state 變量      Sync(int count) {          setState(count);      }        int getCount() {          return getState();      }        // 嘗試獲取資源的操作      // 只有當 state 變量為 0 的時候才能獲取成功(返回 1)      protected int tryAcquireShared(int acquires) {          return (getState() == 0) ? 1 : -1;      }        // 嘗試釋放資源的操作          protected boolean tryReleaseShared(int releases) {          // Decrement count; signal when transition to zero          for (;;) {              int c = getState();              if (c == 0)                  return false;              // 該操作就是嘗試把 state 變量減去 1              int nextc = c-1;              if (compareAndSetState(c, nextc))                  return nextc == 0;          }      }  }

Sync 繼承了 AQS 抽象類,根據 AQS 可知,acquireSharedInterruptibly 和 tryAcquireSharedNanos 方法的實現都調用了 tryAcquireShared。

 

流程說明:通常先把 CountDownLatch 的計數器(state)初始化為 N,執行 wait 操作就是嘗試以共享模式獲取資源,而每次 countDown 操作就是將 N 減去 1,只有當 N 減到 0 的時候,才能獲取成功(tryAcquireShared 方法),然後繼續執行。

 

場景舉例

 

為便於理解該類的用法,舉兩個簡單的例子來說明它的使用場景。

 

場景 1:一個線程等待多個線程執行完之後再繼續執行

public void test() throws InterruptedException {      int count = 5;      // CountDownLatch 的初始化計數器為 5      // 注意線程數和計數器保持一致      CountDownLatch countDownLatch = new CountDownLatch(count);      for (int i = 0; i < count; i++) {          int finalI = i;          new Thread(() -> {              try {                  TimeUnit.SECONDS.sleep(finalI);              } catch (InterruptedException e) {                  e.printStackTrace();              }              System.out.println(Thread.currentThread().getName() + " is working ..");              // 每個線程執行結束時執行 countDown              countDownLatch.countDown();          }).start();      }      // 主線程進入等待狀態(嘗試獲取資源,成功後才能繼續執行)      countDownLatch.await();      System.out.println(Thread.currentThread().getName() + " go on ..");  }    /*  輸出結果:      Thread-0 is working ..      Thread-1 is working ..      Thread-2 is working ..      Thread-3 is working ..      Thread-4 is working ..      main go on ..  */

 

場景 2:一個線程到達指定條件後,通知另一個線程

private static volatile List<Integer> list = new ArrayList<>();    private static void test() {    CountDownLatch countDownLatch = new CountDownLatch(1);      new Thread(() -> {      if (list.size() != 5) {        try {          // list 的大小為 5 時再繼續執行,否則等待          // 等待 state 減到 0          countDownLatch.await();        } catch (InterruptedException e) {          e.printStackTrace();        }      }      System.out.println(Thread.currentThread().getName() + " start..");    }).start();      new Thread(() -> {      for (int i = 0; i < 10; i++) {        list.add(i);        System.out.println(Thread.currentThread().getName() + " add " + i);        if (list.size() == 5) {          // 滿足條件時將 state 減 1          countDownLatch.countDown();        }        try {          TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).start();  }    /*  輸出結果:      Thread-1 add 0      Thread-1 add 1      Thread-1 add 2      Thread-1 add 3      Thread-1 add 4      Thread-0 start..      Thread-1 add 5      Thread-1 add 6      Thread-1 add 7      Thread-1 add 8      Thread-1 add 9  */

 

小結

 

CountDownLatch 可以理解為一個倒數的計數器,它的典型應用場景就是一個線程等待幾個線程執行結束後再繼續執行。其內部是基於 AQS 的共享模式實現的。

 

 

相關閱讀:

JDK源碼分析-AbstractQueuedSynchronizer(3)

 

 

Stay hungry, stay foolish.

PS: 本文首發於微信公眾號【WriteOnRead】。

Exit mobile version