CountDownLatch的原理

  • 2020 年 1 月 21 日
  • 筆記

上次大概说了CountDownLatch的使用,今天说下实现的原理,CountDownLatch的使用效果和Join差不多,实现起来也比较简单。

大体的思路就是一个死循环阻塞,等到某个条件满足后就跳出循环,继续执行后面的代码。执行逻辑如下:

源码分析


我们下面分析下CountDownLatch的源码:

创建CountDownLatch对象

  public CountDownLatch(int count) {          if (count < 0) throw new IllegalArgumentException("count < 0");          this.sync = new Sync(count);      }

创建CountDownLatch 对象很简单,就是创建一个Sync对象。 Sync的对应的代码

 Sync(int count) {       setState(count);     }    protected final void setState(int newState) {     state = newState;  }

这个可以确定给state设置了一个值。

await 方法 阻塞等待

 public void await() throws InterruptedException {          sync.acquireSharedInterruptibly(1);      }       public final void acquireSharedInterruptibly(int arg)          throws InterruptedException {      if (Thread.interrupted())          throw new InterruptedException();      if (tryAcquireShared(arg) < 0)          doAcquireSharedInterruptibly(arg);  }     protected int tryAcquireShared(int acquires) {              return (getState() == 0) ? 1 : -1;          }      private void doAcquireSharedInterruptibly(int arg)          throws InterruptedException {          final Node node = addWaiter(Node.SHARED);          boolean failed = true;          try {              for (;;) {                  final Node p = node.predecessor();                  if (p == head) {                      int r = tryAcquireShared(arg);                      if (r >= 0) {                          setHeadAndPropagate(node, r);                          p.next = null; // help GC                          failed = false;                          return;                      }                  }                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      throw new InterruptedException();              }          } finally {              if (failed)                  cancelAcquire(node);          }      }

doAcquireSharedInterruptibly 这个方法中有一个for(;;),这个是一个死循环, 直到tryAcquireShared 返回的r>=0.也就是state==0。

countDown 方法

   public void countDown() {          sync.releaseShared(1);      }    public final boolean releaseShared(int arg) {          if (tryReleaseShared(arg)) {              doReleaseShared();              return true;          }          return false;      }             protected boolean tryReleaseShared(int releases) {              // Decrement count; signal when transition to zero              for (;;) {                  int c = getState();                  if (c == 0)                      return false;                  int nextc = c-1;                  if (compareAndSetState(c, nextc))                      return nextc == 0;              }          }      private void doReleaseShared() {          /*           * Ensure that a release propagates, even if there are other           * in-progress acquires/releases.  This proceeds in the usual           * way of trying to unparkSuccessor of head if it needs           * signal. But if it does not, status is set to PROPAGATE to           * ensure that upon release, propagation continues.           * Additionally, we must loop in case a new node is added           * while we are doing this. Also, unlike other uses of           * unparkSuccessor, we need to know if CAS to reset status           * fails, if so rechecking.           */          for (;;) {              Node h = head;              if (h != null && h != tail) {                  int ws = h.waitStatus;                  if (ws == Node.SIGNAL) {                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                          continue;            // loop to recheck cases                      unparkSuccessor(h);                  }                  else if (ws == 0 &&                           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                      continue;                // loop on failed CAS              }              if (h == head)                   // loop if head changed                  break;          }      }

compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这个是用CAS的方法改变state的值。