Java多執行緒(十五):CountDownLatch,Semaphore,Exchanger,CyclicBarrier,Callable和Future

  • 2019 年 11 月 3 日
  • 筆記

CountDownLatch

CountDownLatch用來使一個執行緒或多個執行緒等待到其他執行緒完成。CountDownLatch有個初始值count,await方法會阻塞執行緒,直到通過countDown方法調用使count減少為0才會執行await方法後面的程式碼。
示例程式碼
MyThread50_0是WorkThread,不同的執行緒休眠時間不一樣。

public class MyThread50_0 extends Thread  {      private CountDownLatch cdl;      private int sleepSecond;        public MyThread50_0(String name, CountDownLatch cdl, int sleepSecond)      {          super(name);          this.cdl = cdl;          this.sleepSecond = sleepSecond;      }        public void run()      {          try          {              System.out.println(this.getName() + "啟動了,時間為" + new Date());              Thread.sleep(sleepSecond * 1000);              cdl.countDown();              System.out.println(this.getName() + "執行完了,時間為" + new Date());          }          catch (InterruptedException e)          {              e.printStackTrace();          }      }  }

MyThread50_1是DoneThread和main方法

public class MyThread50_1 extends Thread {      private CountDownLatch cdl;        public MyThread50_1(String name, CountDownLatch cdl)      {          super(name);          this.cdl = cdl;      }        public void run()      {          try          {              System.out.println(this.getName() + "要等待了, 時間為" + new Date());              cdl.await();              System.out.println(this.getName() + "等待完了, 時間為" + new Date());          }          catch (InterruptedException e)          {              e.printStackTrace();          }      }        public static void main(String[] args) {          CountDownLatch cdl = new CountDownLatch(3);          MyThread50_1 dt0 = new MyThread50_1("DoneThread1", cdl);          MyThread50_1 dt1 = new MyThread50_1("DoneThread2", cdl);          dt0.start();          dt1.start();          MyThread50_0 wt0 = new MyThread50_0("WorkThread1", cdl, 2);          MyThread50_0 wt1 = new MyThread50_0("WorkThread2", cdl, 3);          MyThread50_0 wt2 = new MyThread50_0("WorkThread3", cdl, 4);          wt0.start();          wt1.start();          wt2.start();      }  }

運行結果如下

DoneThread2要等待了, 時間為Sun Sep 22 21:37:57 CEST 2019  DoneThread1要等待了, 時間為Sun Sep 22 21:37:57 CEST 2019  WorkThread3啟動了,時間為Sun Sep 22 21:37:57 CEST 2019  WorkThread2啟動了,時間為Sun Sep 22 21:37:57 CEST 2019  WorkThread1啟動了,時間為Sun Sep 22 21:37:57 CEST 2019  WorkThread1執行完了,時間為Sun Sep 22 21:37:59 CEST 2019  WorkThread2執行完了,時間為Sun Sep 22 21:38:00 CEST 2019  WorkThread3執行完了,時間為Sun Sep 22 21:38:01 CEST 2019  DoneThread2等待完了, 時間為Sun Sep 22 21:38:01 CEST 2019  DoneThread1等待完了, 時間為Sun Sep 22 21:38:01 CEST 2019

「DoneThreadX要等待了」和「WorkThreadX啟動了」的順序是隨機的。
「WorkThreadX執行完了「的順序按照1、2、3,因為我們的等待時間2、3、4秒。
待WorkThread3執行完了,才會執行await()之後的程式碼,DoneThreadX執行完了,同樣該順序隨機。
這是一種加強版的等待/通知模型,它可以實現多個工作執行緒完成任務後通知多個等待執行緒開始工作。
我們之前的等待/通知模型只能實現一個工作執行緒完成任務後通知一個等待執行緒或者所有等待執行緒開始工作。

Semaphore

Semaphore用來控制並發數量,Semaphore構造函數傳入permit(許可),一個permit相當於一個不可重入鎖,acquire方法獲得permit,relase方法歸還permit。
程式碼示例如下

public class MyThread51 {      public static void main(String[] args)      {          final Semaphore semaphore = new Semaphore(5);            Runnable runnable = new Runnable()          {              public void run()              {                  try                  {                      semaphore.acquire();                      System.out.println(Thread.currentThread().getName() + "獲得了permit,時間為" + new Date());                      Thread.sleep(2000);                      System.out.println(Thread.currentThread().getName() + "釋放了permit,時間為" + new Date());                    }                  catch (InterruptedException e)                  {                      e.printStackTrace();                  }                  finally                  {                      semaphore.release();                  }              }          };            Thread[] threads = new Thread[10];          for (int i = 0; i < threads.length; i++)              threads[i] = new Thread(runnable);          for (int i = 0; i < threads.length; i++)              threads[i].start();      }  }

運行結果如下

Thread-2獲得了permit,時間為Sun Sep 29 21:47:05 CEST 2019  Thread-3獲得了permit,時間為Sun Sep 29 21:47:05 CEST 2019  Thread-4獲得了permit,時間為Sun Sep 29 21:47:05 CEST 2019  Thread-1獲得了permit,時間為Sun Sep 29 21:47:05 CEST 2019  Thread-0獲得了permit,時間為Sun Sep 29 21:47:05 CEST 2019  Thread-3釋放了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-1釋放了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-0釋放了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-2釋放了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-4釋放了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-5獲得了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-7獲得了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-6獲得了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-9獲得了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-8獲得了permit,時間為Sun Sep 29 21:47:07 CEST 2019  Thread-5釋放了permit,時間為Sun Sep 29 21:47:09 CEST 2019  Thread-8釋放了permit,時間為Sun Sep 29 21:47:09 CEST 2019  Thread-9釋放了permit,時間為Sun Sep 29 21:47:09 CEST 2019  Thread-6釋放了permit,時間為Sun Sep 29 21:47:09 CEST 2019  Thread-7釋放了permit,時間為Sun Sep 29 21:47:09 CEST 2019

2,3,4,1,0先獲得了permit,相差兩秒釋放了permit;
5,7,6,9,8獲得了permit,相差兩秒釋放了permit;
因為我們設置的permit是5,所有隻能有五個執行緒獲得permit。

Exchanger

Exchanger用來交換兩個執行緒中的數據
示例程式碼如下

public class MyThread52 extends Thread{      private String str;      private Exchanger<String> exchanger;      private int sleepSecond;        public MyThread52(String str, Exchanger<String> exchanger, int sleepSecond) {          this.str = str;          this.exchanger = exchanger;          this.sleepSecond = sleepSecond;      }        public void run() {          try {              System.out.println(this.getName() + "啟動, 原數據為" + str + ", 時間為" + new Date());              Thread.sleep(sleepSecond * 1000);              str = exchanger.exchange(str);              System.out.println(this.getName() + "交換了數據, 交換後的數據為" + str + ", 時間為" + new Date());          } catch (InterruptedException e) {              e.printStackTrace();          }      }          public static void main(String[] args) {          Exchanger<String> exchanger = new Exchanger<String>();          MyThread52 et0 = new MyThread52("111", exchanger, 3);          MyThread52 et1 = new MyThread52("222", exchanger, 2);            et0.start();          et1.start();      }  }

運行結果如下

Thread-1啟動, 原數據為222, 時間為Sun Sep 29 22:18:36 CEST 2019  Thread-0啟動, 原數據為111, 時間為Sun Sep 29 22:18:36 CEST 2019  Thread-0交換了數據, 交換後的數據為222, 時間為Sun Sep 29 22:18:39 CEST 2019  Thread-1交換了數據, 交換後的數據為111, 時間為Sun Sep 29 22:18:39 CEST 2019

可以看到,數據發生了交換,時間差為最長時間3s。

CyclicBarrier

一組執行緒等待對方都達到barrier point,再執行接下來的動作,barrier point是循環的,它可以重用。
示例程式碼如下

public class MyThread53 extends Thread{      private CyclicBarrier cb;      private int sleepSecond;        public MyThread53(CyclicBarrier cb, int sleepSecond)      {          this.cb = cb;          this.sleepSecond = sleepSecond;      }        public void run()      {          try          {              System.out.println(this.getName() + "運行了");              System.out.println(this.getName() + "準備等待了, 時間為" + new Date());              Thread.sleep(sleepSecond * 1000);                cb.await();              System.out.println(this.getName() + "結束等待了, 時間為" + new Date());          }          catch (Exception e)          {              e.printStackTrace();          }      }          public static void main(String[] args)      {          Runnable runnable = new Runnable()          {              public void run()              {                  System.out.println("CyclicBarrier的所有執行緒await()結束了,我運行了, 時間為" + new Date());              }          };          //需要等待三個執行緒await()後再執行runnable          CyclicBarrier cb = new CyclicBarrier(3, runnable);          MyThread53 cbt0 = new MyThread53(cb, 3);          MyThread53 cbt1 = new MyThread53(cb, 6);          MyThread53 cbt2 = new MyThread53(cb, 9);          cbt0.start();          cbt1.start();          cbt2.start();      }  }

運行結果如下

Thread-0運行了  Thread-1運行了  Thread-2運行了  Thread-1準備等待了, 時間為Mon Sep 30 23:02:11 CEST 2019  Thread-2準備等待了, 時間為Mon Sep 30 23:02:11 CEST 2019  Thread-0準備等待了, 時間為Mon Sep 30 23:02:11 CEST 2019  CyclicBarrier的所有執行緒await()結束了,我運行了, 時間為Mon Sep 30 23:02:20 CEST 2019  Thread-2結束等待了, 時間為Mon Sep 30 23:02:20 CEST 2019  Thread-0結束等待了, 時間為Mon Sep 30 23:02:20 CEST 2019  Thread-1結束等待了, 時間為Mon Sep 30 23:02:20 CEST 2019

Runnable執行緒在Thread-0,Thread-1,Thread-2 await()後運行,Runnable執行緒和三個執行緒的執行時間幾乎相同。

Callable和Future

Callable
由於Runnable介面run()返回值是void類型,執行任務後無法返回結果。所以我們需要Callable介面,該介面的call()可以返回值。
Future
Future表示一個非同步計算結果,Future提供了如下方法
get():獲取任務執行結果
cancel():中斷任務
isDone():判斷任務是否執行完成
isCancelled():判斷任務是否被取消

示例程式碼如下

public class MyThread54 implements Callable<String> {      public String call() throws Exception      {          System.out.println("進入CallableThread的call()方法, 開始睡覺, 睡覺時間為" + new Date());          Thread.sleep(10000);          return "是ss12";      }        public static void main(String[] args) throws Exception      {          ExecutorService es = Executors.newCachedThreadPool();          MyThread54 ct = new MyThread54();          Future<String> f = es.submit(ct);          es.shutdown();            Thread.sleep(5000);          System.out.println("主執行緒等待5秒, 當前時間為" + new Date());            String str = f.get();          System.out.println("Future已拿到數據, str = " + str + ", 當前時間為" + new Date());      }  }

運行結果如下

進入CallableThread的call()方法, 開始睡覺, 睡覺時間為Sun Nov 03 11:00:22 CET 2019  主執行緒等待5秒, 當前時間為Sun Nov 03 11:00:27 CET 2019  Future已拿到數據, str = 是ss12, 當前時間為Sun Nov 03 11:00:32 CET 2019

可以看到,Future在10s後拿到了返回結果。