CountDownLatch、CyclicBarrier和Semaphore使用

  • 2019 年 10 月 6 日
  • 筆記
  • CountDownLatch

CountDownLatch是用来线程计数的。等待一组线程全部执行完后再本线程继续执行。如:A线程需要等待B、C和D(由初始化CountDownLatch参数觉得等待多少个线程)线程执行完后再执行。

主要的方法:

// 构造方法,count决定等待多少个线程  public CountDownLatch(int count)  // 等待线程完成数减1  public void countDown()  // 调用await本线程会挂起,当等待线程未完成数为0,即countDown调用次数等于构造方法参数值时会被唤醒  public void await()  public boolean await(long timeout, TimeUnit unit)

以下是CountDownLatch的用法:

public class CountDownLatchTest {      public static void main(String[] args) {          final CountDownLatch countDownLatch = new CountDownLatch(2);          for (int i = 0; i < 2; i++) {              new Thread(() -> {                  try {                      String name = Thread.currentThread().getName();                      System.out.println("子线程" + name + "正在运行中......");                      TimeUnit.SECONDS.sleep(5);                      System.out.println("子线程" + name + "执行完毕");                      countDownLatch.countDown();                  } catch (Exception e) {                      e.printStackTrace();                  }              }).start();          }            try {              System.out.println("======等待两个子线程执行完毕");              // 只有子线程执行完毕个数等于CountDownLatch初始化个数才会继续执行await后面代码              countDownLatch.await();              System.out.println("两个子线程都执行完毕!!!!!!");          } catch (InterruptedException e) {              e.printStackTrace();          }            System.out.println("主线程继续执行中。。。。。。。。");      }  }

结果:

子线程Thread-0正在运行中......  ======等待两个子线程执行完毕  子线程Thread-1正在运行中......  子线程Thread-1执行完毕  子线程Thread-0执行完毕  两个子线程都执行完毕!!!!!!  主线程继续执行中。。。。。。。。
  • CyclicBarrier

CyclicBarrier 设置一个障碍点,让同组线程到达改点的线程等待本组未达到该点的线程,同组全部线程到达该点才越过障碍物释放资源,让其它组循环利用该对象。

主要方法:

// 构造方法设置同组线程数  public CyclicBarrier(int parties)  // 构造方法设置同组线程数;全部达到障碍物时执行barrierAction函数  public CyclicBarrier(int parties, Runnable barrierAction)  // 某个线程调用await时挂起,需要等同组所有线程到达才能唤醒  public int await() throws InterruptedException, BrokenBarrierException

以下是 CyclicBarrier 的用法:

/**   * Created on 18/3/15 14:40.   *   * @author wolf   */  public class CyclicBarrierTest {      public static void main(String[] args) {          int n = 3;          CyclicBarrier barrier = new CyclicBarrier(n);          for (int i = 0; i < 3; i++) {              new Worker(barrier).start();          }            try {              TimeUnit.SECONDS.sleep(6);              System.out.println("--------讨厌的循环利用分割线------------");          } catch (Exception e) {              e.printStackTrace();          }            for (int i = 0; i < n; i++) {              new Worker(barrier).start();          }      }        static class Worker extends Thread {          private CyclicBarrier barrier;          public Worker(CyclicBarrier barrier) {              this.barrier = barrier;          }          @Override          public void run() {              long startTime = System.currentTimeMillis();              String name = Thread.currentThread().getName();              System.out.println("线程 " + name + " 正在执行任务.......");              try {                  int time = RandomUtils.nextInt(1, 5);                  TimeUnit.SECONDS.sleep(time);                  System.out.println("线程 " + name + " 执行任务完毕!!!用时:" + time);                  barrier.await();              } catch (Exception e) {                  e.printStackTrace();              }              System.out.println("线程 " + name + " 全部耗时:"+(System.currentTimeMillis()-startTime));          }      }  }
  • Semaphore

Semaphore 通过名字我们就知道这个计算信号量,也就是控制获取资源的许可证。

主要的方法:

// 构造方法设置可用资源的数量,即许可证的数量  public Semaphore(int permits)  // 构造方法设置可用资源的数量,即许可证的数量,fair 是否公平获得许可证,默认是先来后到公平的  public Semaphore(int permits, boolean fair)  // 同步获取许可证,  public void acquire() throws InterruptedException  // 尝试获取许可证,结果直接返回  public boolean tryAcquire()  // 尝试获取许可证,结果等待时间超时直接返回  public boolean tryAcquire(long timeout, TimeUnit unit)  // 释放许可证,可给其它等待线程使用  public void release()

以下是 Semaphore 用法:

public class SemaphoreTest {      public static void main(String[] args) {          int no = 5;          Semaphore semaphore = new Semaphore(2);          for (int i = 0; i < no; i++) {              new Worker(i, semaphore).start();          }      }      static class Worker extends Thread {          private int no;          private Semaphore semaphore;          public Worker(int no, Semaphore semaphore) {              this.no = no;              this.semaphore = semaphore;          }          @Override          public void run() {              try {                  long startTime = System.currentTimeMillis();                  semaphore.acquire();                  System.out.println("worker " + this.no + " 获取许可证,开始工作....");                  int time = RandomUtils.nextInt(1, 5);                  TimeUnit.SECONDS.sleep(time);                  semaphore.release();                  System.out.println("worker " + this.no + " 完成工作,释放许可证!耗时:" + (System.currentTimeMillis() - startTime));              } catch (Exception e) {                  e.printStackTrace();              }          }      }  }