CountDownLatch的原理
- 2020 年 1 月 21 日
- 筆記
上次大概說了CountDownLatch的使用,今天說下實現的原理,CountDownLatch的使用效果和Join差不多,實現起來也比較簡單。
大體的思路就是一個死循環阻塞,等到某個條件滿足後就跳出循環,繼續執行後面的程式碼。執行邏輯如下:

源碼分析
我們下面分析下CountDownLatch的源碼:
創建CountDownLatch對象
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
創建CountDownLatch 對象很簡單,就是創建一個Sync對象。 Sync的對應的程式碼
Sync(int count) { setState(count); } protected final void setState(int newState) { state = newState; }
這個可以確定給state設置了一個值。
await 方法 阻塞等待
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireSharedInterruptibly 這個方法中有一個for(;;),這個是一個死循環, 直到tryAcquireShared 返回的r>=0.也就是state==0。
countDown 方法
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這個是用CAS的方法改變state的值。