Java多线程(十五):CountDownLatch,Semaphore,Exchanger,CyclicBarrier,Callable和Future

  • 2019 年 11 月 3 日
  • 筆記

CountDownLatch

CountDownLatch用来使一个线程或多个线程等待到其他线程完成。CountDownLatch有个初始值count,await方法会阻塞线程,直到通过countDown方法调用使count减少为0才会执行await方法后面的代码。
示例代码
MyThread50_0是WorkThread,不同的线程休眠时间不一样。

public class MyThread50_0 extends Thread  {      private CountDownLatch cdl;      private int sleepSecond;        public MyThread50_0(String name, CountDownLatch cdl, int sleepSecond)      {          super(name);          this.cdl = cdl;          this.sleepSecond = sleepSecond;      }        public void run()      {          try          {              System.out.println(this.getName() + "启动了,时间为" + new Date());              Thread.sleep(sleepSecond * 1000);              cdl.countDown();              System.out.println(this.getName() + "执行完了,时间为" + new Date());          }          catch (InterruptedException e)          {              e.printStackTrace();          }      }  }

MyThread50_1是DoneThread和main方法

public class MyThread50_1 extends Thread {      private CountDownLatch cdl;        public MyThread50_1(String name, CountDownLatch cdl)      {          super(name);          this.cdl = cdl;      }        public void run()      {          try          {              System.out.println(this.getName() + "要等待了, 时间为" + new Date());              cdl.await();              System.out.println(this.getName() + "等待完了, 时间为" + new Date());          }          catch (InterruptedException e)          {              e.printStackTrace();          }      }        public static void main(String[] args) {          CountDownLatch cdl = new CountDownLatch(3);          MyThread50_1 dt0 = new MyThread50_1("DoneThread1", cdl);          MyThread50_1 dt1 = new MyThread50_1("DoneThread2", cdl);          dt0.start();          dt1.start();          MyThread50_0 wt0 = new MyThread50_0("WorkThread1", cdl, 2);          MyThread50_0 wt1 = new MyThread50_0("WorkThread2", cdl, 3);          MyThread50_0 wt2 = new MyThread50_0("WorkThread3", cdl, 4);          wt0.start();          wt1.start();          wt2.start();      }  }

运行结果如下

DoneThread2要等待了, 时间为Sun Sep 22 21:37:57 CEST 2019  DoneThread1要等待了, 时间为Sun Sep 22 21:37:57 CEST 2019  WorkThread3启动了,时间为Sun Sep 22 21:37:57 CEST 2019  WorkThread2启动了,时间为Sun Sep 22 21:37:57 CEST 2019  WorkThread1启动了,时间为Sun Sep 22 21:37:57 CEST 2019  WorkThread1执行完了,时间为Sun Sep 22 21:37:59 CEST 2019  WorkThread2执行完了,时间为Sun Sep 22 21:38:00 CEST 2019  WorkThread3执行完了,时间为Sun Sep 22 21:38:01 CEST 2019  DoneThread2等待完了, 时间为Sun Sep 22 21:38:01 CEST 2019  DoneThread1等待完了, 时间为Sun Sep 22 21:38:01 CEST 2019

“DoneThreadX要等待了”和“WorkThreadX启动了”的顺序是随机的。
“WorkThreadX执行完了“的顺序按照1、2、3,因为我们的等待时间2、3、4秒。
待WorkThread3执行完了,才会执行await()之后的代码,DoneThreadX执行完了,同样该顺序随机。
这是一种加强版的等待/通知模型,它可以实现多个工作线程完成任务后通知多个等待线程开始工作。
我们之前的等待/通知模型只能实现一个工作线程完成任务后通知一个等待线程或者所有等待线程开始工作。

Semaphore

Semaphore用来控制并发数量,Semaphore构造函数传入permit(许可),一个permit相当于一个不可重入锁,acquire方法获得permit,relase方法归还permit。
代码示例如下

public class MyThread51 {      public static void main(String[] args)      {          final Semaphore semaphore = new Semaphore(5);            Runnable runnable = new Runnable()          {              public void run()              {                  try                  {                      semaphore.acquire();                      System.out.println(Thread.currentThread().getName() + "获得了permit,时间为" + new Date());                      Thread.sleep(2000);                      System.out.println(Thread.currentThread().getName() + "释放了permit,时间为" + new Date());                    }                  catch (InterruptedException e)                  {                      e.printStackTrace();                  }                  finally                  {                      semaphore.release();                  }              }          };            Thread[] threads = new Thread[10];          for (int i = 0; i < threads.length; i++)              threads[i] = new Thread(runnable);          for (int i = 0; i < threads.length; i++)              threads[i].start();      }  }

运行结果如下

Thread-2获得了permit,时间为Sun Sep 29 21:47:05 CEST 2019  Thread-3获得了permit,时间为Sun Sep 29 21:47:05 CEST 2019  Thread-4获得了permit,时间为Sun Sep 29 21:47:05 CEST 2019  Thread-1获得了permit,时间为Sun Sep 29 21:47:05 CEST 2019  Thread-0获得了permit,时间为Sun Sep 29 21:47:05 CEST 2019  Thread-3释放了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-1释放了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-0释放了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-2释放了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-4释放了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-5获得了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-7获得了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-6获得了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-9获得了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-8获得了permit,时间为Sun Sep 29 21:47:07 CEST 2019  Thread-5释放了permit,时间为Sun Sep 29 21:47:09 CEST 2019  Thread-8释放了permit,时间为Sun Sep 29 21:47:09 CEST 2019  Thread-9释放了permit,时间为Sun Sep 29 21:47:09 CEST 2019  Thread-6释放了permit,时间为Sun Sep 29 21:47:09 CEST 2019  Thread-7释放了permit,时间为Sun Sep 29 21:47:09 CEST 2019

2,3,4,1,0先获得了permit,相差两秒释放了permit;
5,7,6,9,8获得了permit,相差两秒释放了permit;
因为我们设置的permit是5,所有只能有五个线程获得permit。

Exchanger

Exchanger用来交换两个线程中的数据
示例代码如下

public class MyThread52 extends Thread{      private String str;      private Exchanger<String> exchanger;      private int sleepSecond;        public MyThread52(String str, Exchanger<String> exchanger, int sleepSecond) {          this.str = str;          this.exchanger = exchanger;          this.sleepSecond = sleepSecond;      }        public void run() {          try {              System.out.println(this.getName() + "启动, 原数据为" + str + ", 时间为" + new Date());              Thread.sleep(sleepSecond * 1000);              str = exchanger.exchange(str);              System.out.println(this.getName() + "交换了数据, 交换后的数据为" + str + ", 时间为" + new Date());          } catch (InterruptedException e) {              e.printStackTrace();          }      }          public static void main(String[] args) {          Exchanger<String> exchanger = new Exchanger<String>();          MyThread52 et0 = new MyThread52("111", exchanger, 3);          MyThread52 et1 = new MyThread52("222", exchanger, 2);            et0.start();          et1.start();      }  }

运行结果如下

Thread-1启动, 原数据为222, 时间为Sun Sep 29 22:18:36 CEST 2019  Thread-0启动, 原数据为111, 时间为Sun Sep 29 22:18:36 CEST 2019  Thread-0交换了数据, 交换后的数据为222, 时间为Sun Sep 29 22:18:39 CEST 2019  Thread-1交换了数据, 交换后的数据为111, 时间为Sun Sep 29 22:18:39 CEST 2019

可以看到,数据发生了交换,时间差为最长时间3s。

CyclicBarrier

一组线程等待对方都达到barrier point,再执行接下来的动作,barrier point是循环的,它可以重用。
示例代码如下

public class MyThread53 extends Thread{      private CyclicBarrier cb;      private int sleepSecond;        public MyThread53(CyclicBarrier cb, int sleepSecond)      {          this.cb = cb;          this.sleepSecond = sleepSecond;      }        public void run()      {          try          {              System.out.println(this.getName() + "运行了");              System.out.println(this.getName() + "准备等待了, 时间为" + new Date());              Thread.sleep(sleepSecond * 1000);                cb.await();              System.out.println(this.getName() + "结束等待了, 时间为" + new Date());          }          catch (Exception e)          {              e.printStackTrace();          }      }          public static void main(String[] args)      {          Runnable runnable = new Runnable()          {              public void run()              {                  System.out.println("CyclicBarrier的所有线程await()结束了,我运行了, 时间为" + new Date());              }          };          //需要等待三个线程await()后再执行runnable          CyclicBarrier cb = new CyclicBarrier(3, runnable);          MyThread53 cbt0 = new MyThread53(cb, 3);          MyThread53 cbt1 = new MyThread53(cb, 6);          MyThread53 cbt2 = new MyThread53(cb, 9);          cbt0.start();          cbt1.start();          cbt2.start();      }  }

运行结果如下

Thread-0运行了  Thread-1运行了  Thread-2运行了  Thread-1准备等待了, 时间为Mon Sep 30 23:02:11 CEST 2019  Thread-2准备等待了, 时间为Mon Sep 30 23:02:11 CEST 2019  Thread-0准备等待了, 时间为Mon Sep 30 23:02:11 CEST 2019  CyclicBarrier的所有线程await()结束了,我运行了, 时间为Mon Sep 30 23:02:20 CEST 2019  Thread-2结束等待了, 时间为Mon Sep 30 23:02:20 CEST 2019  Thread-0结束等待了, 时间为Mon Sep 30 23:02:20 CEST 2019  Thread-1结束等待了, 时间为Mon Sep 30 23:02:20 CEST 2019

Runnable线程在Thread-0,Thread-1,Thread-2 await()后运行,Runnable线程和三个线程的执行时间几乎相同。

Callable和Future

Callable
由于Runnable接口run()返回值是void类型,执行任务后无法返回结果。所以我们需要Callable接口,该接口的call()可以返回值。
Future
Future表示一个异步计算结果,Future提供了如下方法
get():获取任务执行结果
cancel():中断任务
isDone():判断任务是否执行完成
isCancelled():判断任务是否被取消

示例代码如下

public class MyThread54 implements Callable<String> {      public String call() throws Exception      {          System.out.println("进入CallableThread的call()方法, 开始睡觉, 睡觉时间为" + new Date());          Thread.sleep(10000);          return "是ss12";      }        public static void main(String[] args) throws Exception      {          ExecutorService es = Executors.newCachedThreadPool();          MyThread54 ct = new MyThread54();          Future<String> f = es.submit(ct);          es.shutdown();            Thread.sleep(5000);          System.out.println("主线程等待5秒, 当前时间为" + new Date());            String str = f.get();          System.out.println("Future已拿到数据, str = " + str + ", 当前时间为" + new Date());      }  }

运行结果如下

进入CallableThread的call()方法, 开始睡觉, 睡觉时间为Sun Nov 03 11:00:22 CET 2019  主线程等待5秒, 当前时间为Sun Nov 03 11:00:27 CET 2019  Future已拿到数据, str = 是ss12, 当前时间为Sun Nov 03 11:00:32 CET 2019

可以看到,Future在10s后拿到了返回结果。