CountDownLatch源码分析

CountDownLatch源码分析

CountDowntLatch的作用是让主线程等待所有的子线程执行完毕之后再进行执行,同时它是基于AQS进行实现的,所以它内部肯定是通过自定义AQS共享模式下的同步器来实现的,该同步器需要重写AQS提供的tryAcquireShared()以及tryReleaseShared()方法,告诉AQS是否尝试获取同步资源以及释放同步资源成功。

AQS子类需要定义以及维护同步状态的值,在CountDownLatch中,同步状态state的值为同步资源的个数。

CountDownLatch的结构

public class CountDownLatch {
    
    /**
     * 存在一个AQS共享模式下的同步器
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
       // ......
    }

    // 存在一个全局的同步器属性
    private final Sync sync;
    
    /**
     * 构建方法初始化同步器,并指定同步资源的个数
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    /**
     * 让主线程进行阻塞
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    /**
     * 让倒数器-1
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

可以看到CountDownLatch中定义了一个同步器,然后存在一个全局的同步器属性,然后通过构建方法来初始化同步器,并且指定同步器中同步资源的个数。

CountDownLatch的await()方法将会调用同步器的acquireSharedInterruptibly()方法,countDown()方法将会调用同步器的releaseShared()方法。

剖析同步器

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    /**
     * 构建方法初始化同步资源的个数
     */
    Sync(int count) {
        setState(count);
    }

    /**
     * 获取可用的同步资源个数(就是倒数器当前的值)
     */
    int getCount() {
        return getState();
    }

    /**
     * 尝试获取同步资源
     */
    protected int tryAcquireShared(int acquires) {
        // 只有当同步状态的值为0,方法才返回true
        return (getState() == 0) ? 1 : -1;
    }

    /**
     * 尝试释放同步资源
     */
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            // 让同步状态的值-1
            int nextc = c-1;
            // 只有当线程释放同步资源后,同步状态的值0时,该方法才会返回true
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

tryAcquireShared()方法用于尝试获取同步资源,正常情况下,如果线程获取失败则返回false,否则返回剩余的可用资源个数(state – 要获取的资源个数),但是在CountDownLatch同步器的tryAcquireShared()中,只有当等待状态的值为0时,方法才返回true,否则返回false。

tryReleaseShared()方法用于尝试释放同步资源,正常情况下,将同步状态的值累加,也就是恢复,然后返回true,但是在CountDownLatch同步器的tryReleaseShared()方法中,并没有累加同步状态的值,而是当线程每次释放后,将同步状态的值-1,只有当线程释放同步状态后,state的值为0,该方法才会返回true。

流程总结

1.首先创建一个CountDownLatch实例,并指定倒数器的阈值。

2.主线程调用CountDownLatch的await()方法进行阻塞,该方法调用同步器的acquireSharedInterruptibly()方法,该方法内部会调用tryAcquireShared()方法,尝试获取同步资源,但是在tryAcquireShared()方法中,只有当同步状态的值为0时,方法才会返回true,由于目前同步状态的值不为0,因此方法返回false,因此该线程将会封装成Node节点,然后加入到等待队列当中,该线程将会进行阻塞。

3.子线程调用CountDownLatch的countDown()方法让倒数器-1,该方法调用同步器releaseShared()方法,该方法内部将会调用tryReleaseShared()方法,尝试释放同步资源,但是在tryReleaseShared()方法中,会将同步状态的值-1,同时只有当线程释放同步资源后,同步状态的值为0时,该方法才会返回true,否则返回false,如果tryReleaseShared()方法返回false,那么就不做任何处理,只有当该方法返回true,也就是所有的子线程都执行了countDown()方法,将同步状态的值设置为0,当该方法返回true时,那么就会唤醒离头节点最近的同时等待状态不为CANCELLED的后继节点,也就是主线程,然后主线程尝试获取同步资源,由于当前同步状态的值已经为0,因此tryAcquireShared()方法返回true,然后主线程直接返回,做自己的事情。

Tags: