CountDownLatch和CycliBarrier介紹

  • 2019 年 11 月 1 日
  • 筆記

一、CountDownLatch

  它被用來同步一個或多個任務,強制他們等待其他任務完成,這就是閉鎖。

public CountDownLatch(int count) {          if (count < 0) throw new IllegalArgumentException("count < 0");          this.sync = new Sync(count);      }

  類中只有一個構造函數,一個int類型的參數count,代表計數器。這個計數器的初始值是線程的數量,每當一個線程結束,count-1,當count==0 時,所有線程執行完畢,在閉鎖上等待的線程就可以執行了。

類中還包含了三個公共方法:

public void await()
public boolean await(long timeout, TimeUnit unit)
public void countDown()

  當每個任務完成時,都會調用countDown()方法。而等待問題被解決的任務在這個鎖存器上調用await()方法,這個任務就相當於被掛起了直到 timeout 或 計數器為0

public class CountDownLatchDemo {      static final int SIZE = 5;        public static void main(String[] args) {          ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));          CountDownLatch countDownLatch = new CountDownLatch(SIZE);          pool.execute(new WaitTask(countDownLatch));          for (int i = 0; i < SIZE; i++) {              pool.execute(new TaskNow(countDownLatch));          }      }    }    class TaskNow implements Runnable {      private final CountDownLatch countDownLatch;        TaskNow(CountDownLatch countDownLatch) {          this.countDownLatch = countDownLatch;      }        @Override      public void run() {          try {              TimeUnit.MILLISECONDS.sleep(2000);          } catch (InterruptedException e) {              e.printStackTrace();          }          System.out.println("我沒完事你別想跑");          //調用countDown()的方法會減少count次數直到為0,調用await的任務才能進行          countDownLatch.countDown();      }  }    class WaitTask implements Runnable {      private final CountDownLatch countDownLatch;        WaitTask(CountDownLatch countDownLatch) {          this.countDownLatch = countDownLatch;      }        @Override      public void run() {          try {              //調用await的任務會被掛起              countDownLatch.await();              System.out.println("終於輪到我了");          } catch (InterruptedException e) {              e.printStackTrace();          }      }  }
結果:

  我沒完事你別想跑
  我沒完事你別想跑
  我沒完事你別想跑
  我沒完事你別想跑
  我沒完事你別想跑
  終於輪到我了

二、CylicBarrier

  柵欄類似於閉鎖,只是要等到所有線程都到達柵欄,才能進行接下來的動作,在沒到達柵欄之前先到的要等待。

構造方法

共有兩個構造方法:

 

 

 

public CyclicBarrier(int parties, Runnable barrierAction) {          if (parties <= 0) throw new IllegalArgumentException();          this.parties = parties;          this.count = parties;          this.barrierCommand = barrierAction;      }  public CyclicBarrier(int parties) {          this(parties, null);      }

CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程使用await()方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。

CyclicBarrier的另一個構造函數CyclicBarrier(int parties, Runnable barrierAction),用於線程到達屏障時,優先執行barrierAction,方便處理更複雜的業務場景。

await()方法

  線程調用await()方法表示已經到達同步點,然後當前線程被阻塞。直到parties個參與線程調用了await()方法,CyclicBarrier同樣提供帶超時時間的await和不帶超時時間的await方法:

public int await() throws InterruptedException, BrokenBarrierException {      try {          return dowait(false, 0L);      } catch (TimeoutException toe) {          throw new Error(toe); // cannot happen      }  }  public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException{...}

 

  可以看到await方法調用了dowait()方法,dowait()方法是核心方法其內容是:如果這個線程不是最後一個到達的線程那麼進行等待直到:

  • 最後一個線程到達,即index == 0
  • 某個參與線程等待超時
  • 某個參與線程被中斷
  • 調用了CyclicBarrier的reset()方法。該方法會將屏障重置為初始狀態

案例

public class CycliBarrierDemo {          public static void main(String[] args) {          int threadCount = 3;          CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);          ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));          for (int i = 0; i < threadCount; i++) {              pool.execute(new Horse(cyclicBarrier, i));          }      }  }    class Horse implements Runnable {      private CyclicBarrier cyclicBarrier;      private int num;        public Horse(CyclicBarrier cyclicBarrier, int num) {          this.cyclicBarrier = cyclicBarrier;          this.num = num;      }        @Override      public void run() {          System.out.println("馬匹:" + num + "到了");          try {              cyclicBarrier.await();              System.out.println("馬匹:" + num + "在等待");              TimeUnit.MILLISECONDS.sleep(2000);          } catch (InterruptedException | BrokenBarrierException e) {              e.printStackTrace();          }          System.out.println("馬匹到齊");      }  }


結果:

  馬匹:0到了
  馬匹:2到了
  馬匹:1到了
  馬匹:1在等待
  馬匹:0在等待
  馬匹:2在等待
  馬匹到齊
  馬匹到齊
  馬匹到齊