CountDownLatch的原理

  • 2020 年 1 月 21 日
  • 筆記

上次大概說了CountDownLatch的使用,今天說下實現的原理,CountDownLatch的使用效果和Join差不多,實現起來也比較簡單。

大體的思路就是一個死循環阻塞,等到某個條件滿足後就跳出循環,繼續執行後面的程式碼。執行邏輯如下:

源碼分析


我們下面分析下CountDownLatch的源碼:

創建CountDownLatch對象

  public CountDownLatch(int count) {          if (count < 0) throw new IllegalArgumentException("count < 0");          this.sync = new Sync(count);      }

創建CountDownLatch 對象很簡單,就是創建一個Sync對象。 Sync的對應的程式碼

 Sync(int count) {       setState(count);     }    protected final void setState(int newState) {     state = newState;  }

這個可以確定給state設置了一個值。

await 方法 阻塞等待

 public void await() throws InterruptedException {          sync.acquireSharedInterruptibly(1);      }       public final void acquireSharedInterruptibly(int arg)          throws InterruptedException {      if (Thread.interrupted())          throw new InterruptedException();      if (tryAcquireShared(arg) < 0)          doAcquireSharedInterruptibly(arg);  }     protected int tryAcquireShared(int acquires) {              return (getState() == 0) ? 1 : -1;          }      private void doAcquireSharedInterruptibly(int arg)          throws InterruptedException {          final Node node = addWaiter(Node.SHARED);          boolean failed = true;          try {              for (;;) {                  final Node p = node.predecessor();                  if (p == head) {                      int r = tryAcquireShared(arg);                      if (r >= 0) {                          setHeadAndPropagate(node, r);                          p.next = null; // help GC                          failed = false;                          return;                      }                  }                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      throw new InterruptedException();              }          } finally {              if (failed)                  cancelAcquire(node);          }      }

doAcquireSharedInterruptibly 這個方法中有一個for(;;),這個是一個死循環, 直到tryAcquireShared 返回的r>=0.也就是state==0。

countDown 方法

   public void countDown() {          sync.releaseShared(1);      }    public final boolean releaseShared(int arg) {          if (tryReleaseShared(arg)) {              doReleaseShared();              return true;          }          return false;      }             protected boolean tryReleaseShared(int releases) {              // Decrement count; signal when transition to zero              for (;;) {                  int c = getState();                  if (c == 0)                      return false;                  int nextc = c-1;                  if (compareAndSetState(c, nextc))                      return nextc == 0;              }          }      private void doReleaseShared() {          /*           * Ensure that a release propagates, even if there are other           * in-progress acquires/releases.  This proceeds in the usual           * way of trying to unparkSuccessor of head if it needs           * signal. But if it does not, status is set to PROPAGATE to           * ensure that upon release, propagation continues.           * Additionally, we must loop in case a new node is added           * while we are doing this. Also, unlike other uses of           * unparkSuccessor, we need to know if CAS to reset status           * fails, if so rechecking.           */          for (;;) {              Node h = head;              if (h != null && h != tail) {                  int ws = h.waitStatus;                  if (ws == Node.SIGNAL) {                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                          continue;            // loop to recheck cases                      unparkSuccessor(h);                  }                  else if (ws == 0 &&                           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                      continue;                // loop on failed CAS              }              if (h == head)                   // loop if head changed                  break;          }      }

compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這個是用CAS的方法改變state的值。