CountDownLatch源码阅读
简介
CountDownLatch
是JUC提供的一个线程同步工具,主要功能就是协调多个线程之间的同步,或者说实现线程之间的通信
CountDown,数数字,只能往下数。Latch,门闩。光看名字就能明白这个CountDownLatch
是如何使用的了哈哈。CountDownLatch
就相当于一个计数器,计数器的初值通过构造方法的参数来设置。调用CountDownLatch
实例的await
方法的线程,会等待计数器变为0才会被唤醒,继续向下执行。那么计数器如何变为0呢?
如果其他线程调用该CountDownLatch
实例的countDown
方法,会将计数值减1。当减为0时,会让那些因调用await
方法而阻塞等待的线程继续执行。这样就实现了这些线程之间的同步功能
版权:本文版权归作者和博客园共有
转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任
使用场景
直接介绍或许太抽象,对于初学者来说很难理解,最好的方式就是通过一个实际场景来引入
有一种通用的场景:主线程开启多个子线程去并行执行多个子任务,等待所有子线程执行完毕,主线程收集子线程的执行结果并统计
场景示例
比如,主线程等A和B给它转账,等收齐所有钱再一并放入银行赚利息。示例代码如下:
public class TestCountDownLatch {
// 这里必须是原子类,要保证对money的修改是原子性操作,才能保证线程安全
// 仅把money设置为volatile int是不行的哦
private static final AtomicInteger money = new AtomicInteger(0);
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
Thread threadA = new Thread(() -> {
try {
System.out.println("A转账30元给我");
Thread.sleep(1000);
// 线程A和线程B对money必须要使用CAS修改,否则可能会出错
int origin = money.get();
while (!money.compareAndSet(origin, origin + 30)) {
origin = money.get();
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
System.out.println("A转账完成");
});
Thread threadB = new Thread(() -> {
try {
System.out.println("B转账70元给我");
Thread.sleep(1000);
int origin = money.get();
while (!money.compareAndSet(origin, origin + 30)) {
origin = money.get();
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
System.out.println("B转账完成");
});
System.out.println("等A和B转账给我...");
threadA.start();
threadB.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(latch.getCount());
System.out.println("转账完成,将钱存入银行...");
// 这里就不用CAS修改money,因为只有main线程对money做修改
int origin = money.get();
money.set(origin * 2);
System.out.println("将钱去除,本金加利润一共" + money.get() + "元");
}
}
命令行输出如下:
点击查看代码
等A和B转账给我...
A转账30元给我
B转账70元给我
A转账完成
B转账完成
转账完成,将钱存入银行...
将钱去除,本金加利润一共120元
Process finished with exit code 0
得益于CountDownLatch
的同步功能,当上述代码执行结束时,money
的值必定是120,而不会是0、60或140。因为主线程会一致await
直到线程A和线程B都执行完latch.countDown()
才会继续往下执行
实现原理
CountDownLatch
的实现原理其实就是AbstractQueuedSynchronizer
(AQS)
CountDownLatch
有一个内部类Sync
,它实现了AQS类定义的部分钩子方法,CountDownLatch
通过Sync
类实例sync
实现了所有功能,调用CountDownLatch
的方法都会委托给sync
域来执行
// 所有CountDown的功能都是委托给这个Sync类对象来完成
private final Sync sync;
因此,要搞懂CountDownLatch
,必须搞懂AQS以及Sync
类。接下来就跟我一起来剖析一下源码,看看这个Sync
到底干了些啥
版权:本文版权归作者和博客园共有
转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任
源码剖析
构造方法
CountDownLatch
的构造函数中可以传入count
参数,表明必须调用count
次countDown
方法,才能让调用await
的线程继续向下执行。其源码如下:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
实际上是初始化了一个Sync
类对象,并注入到CountDownLatch
的sync
域中。Sync
构造方法如下:
Sync(int count) {
setState(count);
}
Sync
构造方法实际上就是设置了AQS中的state
,将其设置为初始计数值为count
重要结论:AQS的state
就表示CountDownLatch
当前的计数值
await
await
方法是一个实例方法,调用它的线程一直阻塞等待,直到CountDownLatch
对象的计数值降为0,才能被唤醒。如果调用await
时计数值就已经是0,就不会被阻塞
await
方法是响应中断的:
- 如果一个线程在调用
await
方法之前就已经被中断,那么调用时会直接抛出中断异常 - 如果一个线程调用
await
方法阻塞等待过程中,收到中断信号,就会抛出中断异常
说了这么多,还是来看看await
的源码吧:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看到,这个方法实际上是委托给了Sync
类对象sync
来执行,这里的acquireSharedInterruptibly
已经由Sync
类的父类AQS提供了实现,源码如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
这段代码在 全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析 系列中有详细介绍,不过那里并没有涉及到具体的应用类(如CountDownLatch
这种),只是高屋建瓴地分析过,这里正好借助CountDownLatch
来更好地理解它
从acquireSharedInterruptibly
源码中可以看到,如果线程在调用await
之前就已经被设置了中断状态,那么会直接抛出InterruptedException
异常
该方法接下来会调用钩子方法tryAcquireShared
,Sync
类为该方法提供了具体实现,源码如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
AQS虽然没有为tryAcquireShared
提供具体实现,但是规定了返回值的含义:
- 负数:表明获取失败,该线程需要被加入同步队列阻塞等待
- 0:表明获取共享资源成功,但是后续获取共享资源一定不会成功
- 正数:表明获取共享资源成功,而且后续的获取也可能成功
让我们来分析一下Sync
类实现的tryAcquireShared
方法:
- 如果
state
不为0,即CountDownLatch
对象的计数值还没减到0,则返回-1,会继续执行doAcquireSharedInterruptibly
方法,将调用await
的线程加入同步队列阻塞等待 - 如果
state
为0,即CountDownLatch
对象的计数值已经减为0,则返回1,调用await
会直接返回,不会被阻塞
注:doAcquireSharedInterruptibly
的具体分析见 全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析 系列
countDown
countDown
方法也是实例方法,调用它会将CountDownLatch
对象的计数值减1。如果正好减为0,那么会将所有因调用await
而被阻塞的线程都唤醒。其源码如下:
public void countDown() {
sync.releaseShared(1);
}
该方法实际上委托给sync
来执行,这里的releaseShared
方法在Sync
类的父类AQS中提供了具体实现,其源码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
该方法首先会调用钩子方法tryReleaseShared
,该方法在Sync
类中提供了具体实现,源码如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
AQS虽然没有为tryReleaseShared
提供具体实现,但是规定了返回值的含义:
- true:此次释放资源的行为可能会让一个阻塞等待中的线程被唤醒
- false:otherwise
让我们来分析一下Sync
类实现的tryReleaseShared
方法:
该方法所有代码都包含在一个for
循环中,这是为了应对CAS失败的情况。循环体内CAS修改state
,即将CountDownLatch
的计数值减1
如果CountDownLatch
的计数值减1后变成0,则返回true。那么releaseShared
方法会继续调用doReleaseShared
方法,唤醒同步队列中的后续线程
如果不为0,则返回false,无事发生~
注:doReleaseShared
方法的作用是唤醒队首线程,并确保状态传播,该方法的详细解释见 全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析 系列
getCount
getCount
方法就是返回CountDownLatch
对象当前的计数值,源码如下:
public long getCount() {
return sync.getCount();
}
实际上委托给了sync
对象的getCount
方法来执行,其源码如下:
int getCount() {
return getState();
}
其实就是调用AQS的getState
方法,返回当前的state
,即CountDownLatch
的计数值,很简单哦~
版权:本文版权归作者和博客园共有
转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任
CountDownLatch与join方法的区别
当然,在上述场景中,也可以使用Thread
的对象方法join
来实现这一点,在主线程中调用所有子线程的join
方法,再执行结果收集和统计任务。上面例子如果使用join
方法来实现,代码如下:
public class TestJoin {
private static final AtomicInteger money = new AtomicInteger(0);
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
try {
System.out.println("A转账30元给我");
Thread.sleep(1000);
int origin = money.get();
while (!money.compareAndSet(origin, origin + 30)) {
origin = money.get();
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A转账完成");
});
Thread threadB = new Thread(() -> {
try {
System.out.println("B转账70元给我");
Thread.sleep(1000);
int origin = money.get();
while (!money.compareAndSet(origin, origin + 30)) {
origin = money.get();
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B转账完成");
});
System.out.println("等A和B转账给我...");
threadA.start();
threadB.start();
try {
threadA.join();
threadB.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("转账完成,将钱存入银行...");
int origin = money.get();
money.set(origin * 2);
System.out.println("将钱去除,本金加利润一共" + money.get() + "元");
}
}
命令行输出如下:
点击查看代码
等A和B转账给我...
A转账30元给我
B转账70元给我
B转账完成
A转账完成
转账完成,将钱存入银行...
将钱去除,本金加利润一共120元
Process finished with exit code 0
但是,和CountDownLatch
借助AQS不同,join
方法的执行原理是:不停地检查调用线程是否执行完毕。如果没有,则让当前线程wait。否则才会调用notifyAll
将当前线程唤醒
从执行原理上就可以看出它们的区别主要在于两点:
join
方法没有CountDownLatch
灵活:使用join
方法必须等待调用线程执行完毕,后面就不能再继续执行了。而CountDownLatch
的countDown
方法可以放在调用线程的run
方法中间,这样调用线程不必执行结束,就能唤醒其他await
的线程- 调用
join
方法的线程会一直消耗CPU资源,不会阻塞挂起,即“忙等”,而调用了CountDownLatch
的await
方法的线程会被阻塞挂起,让出CPU执行权,只有等条件合适并被线程调度后才能占用CPU资源