CyclicBarrier源码探究 (JDK 1.8)

  • 2020 年 3 月 13 日
  • 笔记

CyclicBarrier也叫回环栅栏,能够实现让一组线程运行到栅栏处并阻塞,等到所有线程都到达栅栏时再一起执行的功能。“回环”意味着CyclicBarrier可以多次重复使用,相比于CountDownLatch只能使用一次,CyclicBarrier可以节省许多资源,并且还可以在构造器中传入任务,当栅栏条件满足时执行这个任务。CyclicBarrier是使用了ReentrantLock,主要方法在执行时都会加锁,因此并发性能不是很高。

1.相关字段

    //重入锁,CyclicBarrier内部通过重入锁实现线程安全      private final ReentrantLock lock = new ReentrantLock();      //线程阻塞时的等待条件      private final Condition trip = lock.newCondition();      //需要等待的线程数      private final int parties;      //栅栏打开之后首先执行的任务      private final Runnable barrierCommand;      //记录当前的分代标记      private Generation generation = new Generation();      //当前还需要等待多少个线程运行到栅栏位置      private int count;

需要注意的是generation字段,用于标记栅栏当前处在哪一代。当满足一定的条件时(例如调用了reset方法,或者栅栏打开等),栅栏状态会切换到下一代,实际就是new一个新的Generation对象,这是CyclicBarrier的内部类,代码非常简单,如下:

    private static class Generation {          boolean broken = false;   //标记栅栏是否被破坏      }

实际使用的过程中,会利用generation字段判断当前是否在同一个分代,而使用broker字段判断栅栏是否被破坏。

2.构造函数

CyclicBarrier有两个重载的构造函数,构造函数只是对上述的相关字段进行初始化,如下:

    public CyclicBarrier(int parties) {          this(parties, null);      }        public CyclicBarrier(int parties, Runnable barrierAction) {          if (parties <= 0) throw new IllegalArgumentException();          this.parties = parties;          this.count = parties;          this.barrierCommand = barrierAction;      }

3.核心方法

  • await
    await是开发时最常用到的方法了,同CountDownLatch一样,CyclicBarrier也提供了两个await方法,一个不带参数,一个带有超时参数,其内部只是简单调用了一下dowait方法:
    public int await() throws InterruptedException, BrokenBarrierException {          try {              return dowait(false, 0L);          } catch (TimeoutException toe) {              throw new Error(toe); // cannot happen          }      }        public int await(long timeout, TimeUnit unit)          throws InterruptedException,                 BrokenBarrierException,                 TimeoutException {          return dowait(true, unit.toNanos(timeout));      }

接下来看看至关重要的dowait方法:

    private int dowait(boolean timed, long nanos)          throws InterruptedException, BrokenBarrierException,                 TimeoutException {          final ReentrantLock lock = this.lock;          //加重入锁          lock.lock();          try {              //首先获取年龄代信息              final Generation g = generation;              //如果栅栏状态被破坏,抛出异常,例如先启动的线程调用了breakBarrier方法,后启动的线程就能够看到g.broker=true              if (g.broken)                  throw new BrokenBarrierException();              //检测线程的中断状态,如果线程设置了中断状态,则通过breakBarrier设置栅栏为已破坏状态,并唤醒其他线程              //如果这里能够检测到中断状态,那只可能是在await方法外部设置的              if (Thread.interrupted()) {                  breakBarrier();                  throw new InterruptedException();              }              //每调用一次await,就将需要等待的线程数减1              int index = --count;              //index=0表示这是最后一个到达的线程,由该线程执行下面的逻辑              if (index == 0) {  // tripped                  boolean ranAction = false;                  try {                      final Runnable command = barrierCommand;                      //如果在构造器中传入了第二个任务参数,就在放开栅栏前先执行这个任务                      if (command != null)                          command.run();                      ranAction = true;                      //正常结束,需要唤醒阻塞的线程,并换代                      nextGeneration();                      return 0;                  } finally {                      //try代码块如果正常执行,ranAction就一定等于true,而try代码块唯一可能发生异常的地方就是command.run(),                      //因此这里为了保证在任务执行失败时,将栅栏标记为已破坏,唤醒阻塞线程                      if (!ranAction)                          breakBarrier();                  }              }                // loop until tripped, broken, interrupted, or timed out              for (;;) {                  try {                      //没有设置超时标记,就加入等待队列                      if (!timed)                          trip.await();                      //设置了超时标记,但目前还没有超时,则继续等待                      else if (nanos > 0L)                          nanos = trip.awaitNanos(nanos);                  } catch (InterruptedException ie) {                      //如果线程等待的过程中被中断,会执行到这里                      //g == generation表示当前还在同一个年龄分代中,!g.broker表示当前栅栏状态没有被破坏                      if (g == generation && ! g.broken) {                          breakBarrier();                          throw ie;                      } else {                          //上面的条件不满足,说明:1)g!=generation,说明线程执行到这里时已经换代了                          //2)没有换代,但是栅栏被破坏了                          //无论哪种情况,都只是简单地设置一下当前线程的中断状态                          Thread.currentThread().interrupt();                      }                  }                  //栅栏被破坏,抛出异常                  //注意,在breakBarrier方法中会唤醒所有等待条件的线程,这些线程会执行到这里,判断栅栏已经被破坏,都会抛出异常                  if (g.broken)                      throw new BrokenBarrierException();                  //距离上一次设置g变量的值已经过去很长时间了,在执行过程中generation可能已经发生改变,                  //当前线程还是前几代的,不需要再循环阻塞了,直接返回上一代剩余需要等待的线程数                  //注意:代码中breakBarrier方法和nextGeneration方法都会唤醒阻塞的线程,但是breakBarrier在上一个判断就被拦截了,                  //因此走到这里的有三种情况:                  //a)最后一个线程正常执行,栅栏打开导致其他线程被唤醒;不属于当前代的线程直接返回,                  //属于当前代的则可能因为没到栅栏开放条件要继续循环阻塞                  //b)栅栏被重置(调用了reset方法),此时g!=negeration,全都直接返回                  //c)线程等待超时了,不属于当前代的返回就可以了,属于当前代的则要设置generation.broken = true                  if (g != generation)                      return index;                  //如果线程等待超时,标记栅栏为破坏状态并抛出异常,如果还没超时,则自旋后又重新阻塞                  if (timed && nanos <= 0L) {                      breakBarrier();                      throw new TimeoutException();                  }              }          } finally {              //别忘了解锁              lock.unlock();          }      }

dowait的方法逻辑是:每一个调用await方法的线程都会将计数count1,最后一个线程将count减为0时,顺带还要执行barrierCommand指定的任务,并将generation切换到下一代,当然,最重要的还是要唤醒之前在栅栏处阻塞的线程。由于trip对应的Condition对象没有任何地方会修改,因此trip.signalAll()会唤醒所有在该条件上等待的线程,如果线程在等待的过程中,其他线程将generation更新到下一代,就会出现被唤醒的线程中有部分还属于之前那一代的情况。
接下来将会对dowait用到的一些方法进行简单介绍。

  • breakBarrier
    dowait方法有四个地方调用了breakBarrier,从名字可以看出,该方法会将generation.broken设置为true,除此之外,还会还原count的值,并且唤醒所有被阻塞的线程:
    private void breakBarrier() {          generation.broken = true;          count = parties;          //唤醒所有的阻塞线程          trip.signalAll();      }

纵观CyclicBarrier源码,generation.broken统一在breakBarrier方法中被设置为true,而一旦将generation.broken设置为true之后,代码中检查到这个状态之后都会抛出异常,栅栏就没办法再使用了(可以手动调用reset进行重置),而源码中会在以下几种情况调用breakBarrier方法:
1) 当前线程被中断
2)通过构造器传入的任务执行失败
3) 条件等待时被中断
4) 线程等待超时
5) 显式调用reset方法

  • nextGeneration
    private void nextGeneration() {          // 唤醒所有的阻塞线程          trip.signalAll();          // 开启下一代          count = parties;          generation = new Generation();      }
  • reset
    reset方法主要是结束这一代,并切换到下一代
    public void reset() {          final ReentrantLock lock = this.lock;          lock.lock();          try {              breakBarrier();   // break the current generation              nextGeneration(); // start a new generation          } finally {              lock.unlock();          }      }

介绍到这里,整个CyclicBarrier已经差不多介绍完了,但是内部的流程远远没有这么简单,因为很大一部分逻辑封装在AbstractQueuedSynchronizer中,这个类定义了阻塞的线程如何加入等待队列,又如何被唤醒,因此如果想要深入了解线程等待的逻辑,还需要仔细研究AbstractQueuedSynchronizer才行。本文不会对这部分内容进行介绍,后面有时间的话将会专门对其进行介绍。