jdk线程池ThreadPoolExecutor工作原理解析(自己动手实现线程池)(一)
jdk线程池ThreadPoolExecutor工作原理解析(自己动手实现线程池)(一)
线程池介绍
在日常开发中经常会遇到需要使用其它线程将大量任务异步处理的场景(异步化以及提升系统的吞吐量),而在使用线程的过程中却存在着两个痛点。
- 在java等很多主流语言中每个逻辑上的线程底层都对应着一个系统线程(不考虑虚拟线程的情况)。操作系统创建一个新线程是存在一定开销的,
在需要执行大量的异步任务时,如果处理每个任务时都直接向系统申请创建一个线程来执行,并在任务执行完毕后再回收线程,则创建/销毁大量线程的开销将无法忍受。 - 每个系统线程都会占用一定的内存空间,且系统在调度不同线程上下文切换时存在一定的cpu开销。因此在一定的硬件条件下,操作系统能同时维护的系统线程个数相对而言是比较有限的。
在使用线程的过程中如果没有控制好流量,会很容易创建过多的线程而耗尽系统资源,令系统变得不可用。
而线程池正是为解决上述痛点而生的,其通过两个手段来解决上述痛点。
池化线程资源
池化线程资源,顾名思义就是维护一个存活线程的集合(池子)。提交任务的用户程序不直接控制线程的创建和销毁,不用每次执行任务时都申请创建一个新线程,而是通过线程池间接的获得线程去处理异步任务。
线程池中的线程在执行完任务后通常也不会被系统回收掉,而是继续待在池子中用于执行其它的任务(执行堆积的待执行任务或是等待新任务)。
线程池通过池化线程资源,避免了系统反复创建/销毁线程的开销,大幅提高了处理大规模异步任务时的性能。
对线程资源的申请进行收口,限制系统资源的使用
如果程序都统一使用线程池来处理异步任务,则线程池内部便可以对系统资源的使用施加一定限制。
例如用户可以指定一个线程池最大可维护的线程数量,以避免耗尽系统资源。
当用户提交任务的速率过大,导致线程池中的线程数到达指定的最大值时依然无法满足需求时,线程池可以通过丢弃部分任务或限制提交任务的流量的方式来处理这一问题。
线程池通过对线程资源的使用进行统一收口,用户可以通过设置线程池的参数来控制系统资源的使用,从而避免系统资源耗尽。
jdk线程池ThreadPoolExecutor简单介绍
前面介绍了线程池的概念,而要深入理解线程池的工作原理最好的办法便是找到一个优秀的线程池实现来加以研究。
而自jdk1.5中引入的通用线程池框架ThreadPoolExecutor便是一个很好的学习对象。其内部实现不算复杂,却在高效实现核心功能的同时还提供了较丰富的拓展能力。
下面从整体上介绍一下jdk通用线程池ThreadPoolExecutor的工作原理(基于jdk8)。
ThreadPoolExecutor运行时工作流程
首先ThreadPoolExecutor允许用户从两个不同维度来控制线程资源的使用,即最大核心线程数(corePoolSize)和最大线程数(maximumPoolSize)。
最大核心线程数:核心线程指的是通常常驻线程池的线程。常驻线程在线程池没有任务空闲时也不会被销毁,而是处于idle状态,这样在新任务到来时就能很快的进行响应。
最大线程数:和第一节中提到的一样,即线程池中所能允许的活跃线程的最大数量。
在向ThreadPoolExecutor提交任务时(execute方法),会执行一系列的判断来决定任务应该如何被执行(源码在下一节中具体分析)。
- 首先判断当前活跃的线程数是否小于指定的最大核心线程数corePoolSize。
如果为真,则说明当前线程池还未完成预热,核心线程数不饱和,创建一个新线程来执行该任务。
如果为假,则说明当前线程池已完成预热,进行下一步判断。 - 尝试将当前任务放入工作队列workQueue(阻塞队列BlockingQueue),工作队列中的任务会被线程池中的活跃线程按入队顺序逐个消费。
如果入队成功,则说明当前工作队列未满,入队的任务将会被线程池中的某个活跃线程所消费并执行。
如果入队失败,则说明当前工作队列已饱和,线程池消费任务的速度可能太慢了,可能需要创建更多新线程来加速消费,进行下一步判断。 - 判断当前活跃的线程数是否小于指定的最大线程数maximumPoolSize。
如果为真,则说明当前线程池所承载的线程数还未达到参数指定的上限,还有余量来创建新的线程加速消费,创建一个新线程来执行该任务。
如果为假,则说明当前线程池所承载的线程数达到了上限,但处理任务的速度依然不够快,需要触发拒绝策略。
ThreadPoolExecutor优雅停止
线程池的优雅停止一般要能做到以下几点:
- 线程池在中止后不能再受理新的任务
- 线程池中止的过程中,已经提交的现存任务不能丢失(等待剩余任务执行完再关闭或者能够把剩余的任务吐出来还给用户)
- 线程池最终关闭前,确保创建的所有工作线程都已退出,不会出现资源的泄露
线程池自启动后便会有大量的工作线程在内部持续不断并发的执行提交的各种任务,而要想做到优雅停止并不是一件容易的事情。
因此ThreadPoolExecutor中最复杂、细节最多的部分并不在于上文中的正常工作流程,而在于分散在各个地方但又紧密协作的,控制优雅停止的逻辑。
ThreadPoolExecutor的其它功能
除了正常的工作流程以及优雅停止的功能外,ThreadPoolExecutor还提供了一些比较好用的功能
- 提供了很多protected修饰的钩子函数,便于用户继承并实现自己的线程池时进行一定的拓展
- 在运行时统计了总共执行的任务数等关键指标,并提供了对应的api便于用户在运行时观察运行状态
- 允许在线程池运行过程中动态修改关键的配置参数(比如corePoolSize等),并实时的生效。
jdk线程池ThreadPoolExecutor源码解析(自己动手实现线程池v1版本)
如费曼所说:What I can not create I do not understand(我不能理解我创造不了的东西)。
通过模仿jdk的ThreadPoolExecutor实现,从零开始实现一个线程池,可以迫使自己去仔细的捋清楚jdk线程池中设计的各种细节,加深理解而达到更好的学习效果。
前面提到ThreadPoolExecutor的核心逻辑主要分为两部分,一是正常运行时处理提交的任务的逻辑,二是实现优雅停止的逻辑。
因此我们实现的线程池MyThreadPoolExecutor(以My开头用于区分)也会分为两个版本,v1版本只实现前一部分即正常运行时执行任务的逻辑,将有关线程池优雅停止的逻辑全部去除。
相比直接啃jdk最终实现的源码,v1版本的实现会更简单更易理解,让正常执行任务时的逻辑更加清晰而不会耦合太多关于优雅停止的逻辑。
线程池关键成员变量介绍
ThreadPoolExecutor中有许多的成员变量,大致可以分为三类。
可由用户自定义的、用于控制线程池运行的配置参数
- volatile int corePoolSize(最大核心线程数量)
- volatile int maximumPoolSize(最大线程数量)
- volatile long keepAliveTime(idle线程保活时间)
- final BlockingQueue workQueue(工作队列(阻塞队列))
- volatile ThreadFactory threadFactory(工作线程工厂)
- volatile RejectedExecutionHandler handler(拒绝异常处理器)
- volatile boolean allowCoreThreadTimeOut(是否允许核心线程在idle超时后退出)
其中前6个配置参数都可以在ThreadPoolExecutor的构造函数中指定,而allowCoreThreadTimeOut则可以通过暴露的public方法allowCoreThreadTimeOut来动态的设置。
其中大部分属性都是volatile修饰的,目的是让运行过程中可以用过提供的public方法动态修改这些值后,线程池中的工作线程或提交任务的用户线程能及时的感知到变化(线程间的可见性),并进行响应(比如令核心线程自动的idle退出)
这些配置属性具体如何控制线程池行为的原理都会在下面的源码解析中展开介绍。理解这些参数的工作原理后才能在实际的业务中使用线程池时为其设置合适的值。
仅供线程池内部工作时使用的属性
- ReentrantLock mainLock(用于控制各种临界区逻辑的并发)
- HashSet
workers(当前活跃工作线程Worker的集合,工作线程的工作原理会在下文介绍) - AtomicInteger ctl(线程池控制状态,control的简写)
这里重点介绍一下ctl属性。ctl虽然是一个32位的整型字段(AtomicInteger),但实际上却用于标识两个业务属性,即当前线程池的运行状态和worker线程的总数量。
在线程池初始化时状态位RUNNING,worker线程数量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位中的高3位用于标识线程池当前的状态,剩余的29位用于标识线程池中worker线程的数量(因此理论上ThreadPoolExecutor最大可容纳的线程数并不是231-1(32位中符号要占一位),而是229-1)
由于聚合之后单独的读写某一个属性不是很方便,所以ThreadPoolExecutor中提供了很多基于位运算的辅助函数来简化这些逻辑。
ctl这样聚合的设计比起拆分成两个独立的字段有什么好处?
在ThreadPoolExecutor中关于优雅停止的逻辑中有很多地方是需要同时判断当前工作线程数量与线程池状态后,再对线程池状态工作线程数量进行更新的(具体逻辑在下一篇v2版本的博客中展开)。
且为了执行效率,不使用互斥锁而是通过cas重试的方法来解决并发更新的问题。而对一个AtomicInteger属性做cas重试的更新,要比同时控制两个属性进行cas的更新要简单很多,执行效率也高很多。
ThreadPoolExecutor共有五种状态,但有四种都和优雅停止有关(除了RUNNING)。
但由于v1版本的MyThreadPoolExecutorV1不支持优雅停止,所以不在本篇博客中讲解这些状态具体的含义以及其是如何变化的(下一篇v2版本的博客中展开)
记录线程池运行过程中的一些关键指标
- completedTaskCount(线程池自启动后已完成的总任务数)
- largestPoolSize(线程池自启动后工作线程个数的最大值)
在运行过程中,ThreadPoolExecutor会在对应的地方进行埋点,统计一些指标并提供相应的api给用户实时的查询,以提高线程池工作时的可观测性。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
/**
* 指定的最大核心线程数量
* */
private volatile int corePoolSize;
/**
* 指定的最大线程数量
* */
private volatile int maximumPoolSize;
/**
* 线程保活时间(单位:纳秒 nanos)
* */
private volatile long keepAliveTime;
/**
* 存放任务的工作队列(阻塞队列)
* */
private final BlockingQueue<Runnable> workQueue;
/**
* 线程工厂
* */
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略
* */
private volatile MyRejectedExecutionHandler handler;
/**
* 是否允许核心线程在idle一定时间后被销毁(和非核心线程一样)
* */
private volatile boolean allowCoreThreadTimeOut;
/**
* 主控锁
* */
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 当前线程池已完成的任务数量
* */
private long completedTaskCount;
/**
* 维护当前存活的worker线程集合
* */
private final HashSet<MyWorker> workers = new HashSet<>();
/**
* 当前线程池中存在的worker线程数量 + 状态的一个聚合(通过一个原子int进行cas,来避免对两个业务属性字段加锁来保证一致性)
* v1版本只关心前者,即worker线程数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位的有符号整数,有3位是用来存放线程池状态的,所以用来维护当前工作线程个数的部分就只能用29位了
* 被占去的3位中,有1位原来的符号位,2位是原来的数值位。
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 线程池状态poolStatus常量(状态值只会由小到大,单调递增)
* 线程池状态迁移图:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING状态,代表着线程池处于正常运行的状态。能正常的接收并处理提交的任务
* 线程池对象初始化时,状态为RUNNING
* 对应逻辑:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN状态,代表线程池处于停止对外服务的状态。不再接收新提交的任务,但依然会将workQueue工作队列中积压的任务处理完
* 调用了shutdown方法时,状态由RUNNING -> SHUTDOWN
* 对应逻辑:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP状态,代表线程池处于停止状态。不再接受新提交的任务,同时也不再处理workQueue工作队列中积压的任务,当前还在处理任务的工作线程将收到interrupt中断通知
* 之前未调用shutdown方法,直接调用了shutdownNow方法,状态由RUNNING -> STOP
* 之前先调用了shutdown方法,后调用了shutdownNow方法,状态由SHUTDOWN -> STOP
* 对应逻辑:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING状态,代表着线程池即将完全终止,正在做最后的收尾工作
* 当前线程池状态为SHUTDOWN,任务被消费完工作队列workQueue为空,且工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由SHUTDOWN->TIDYING
* 当前线程池状态为STOP,工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由STOP->TIDYING
* 对应逻辑:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED状态,代表着线程池完全的关闭。之前线程池已经处于TIDYING状态,且调用的钩子函数terminated已返回
* 当前线程池状态为TIDYING,调用的钩子函数terminated已返回
* 对应逻辑:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 跟踪线程池曾经有过的最大线程数量(只能在mainLock的并发保护下更新)
*/
private int largestPoolSize;
private boolean compareAndIncrementWorkerCount(int expect) {
return this.ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {
// cas更新,workerCount自减1
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
public MyThreadPoolExecutorV1(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
MyRejectedExecutionHandler handler) {
// 基本的参数校验
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (unit == null || workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
// 设置成员变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
Worker工作线程
ThreadPoolExecutor中的工作线程并不是裸的Thread,而是被封装在了一个Worker的内部类中。
Worker实现了Runnable所以可以作为一个普通的线程来启动,在run方法中只是简单的调用了一下runWorker(runWorker后面再展开)。
Worker类有三个成员属性:
- Thread thread(被封装的工作线程对象)
- Runnable firstTask(提交任务时,创建新Worker对象时指定的第一次要执行的任务(后续线程就会去拉取工作队列里的任务执行了))
- volatile long completedTasks(统计用,计算当前工作线程总共完成了多少个任务)
Worker内封装的实际的工作线程对象thread,其在构造函数中由线程池的线程工厂threadFactory生成,传入this,所以thread在start后,便会调用run方法进而执行runWorker。
线程工厂可以由用户在创建线程池时通过参数指定,因此用户在自由控制所生成的工作线程的同时,也需要保证newThread能正确的返回一个可用的线程对象。
除此之外,Worker对象还继承了AbstractQueuedSynchronizer(AQS)类,简单的实现了一个不可重入的互斥锁。
对AQS互斥模式不太了解的读者可以参考一下我之前关于AQS互斥模式的博客:AQS互斥模式与ReentrantLock可重入锁原理解析
AQS中维护了一个volatile修饰的int类型的成员变量state,其具体的含义可以由使用者自己定义。
在Worker中,state的值有三种状态:
- state=-1,标识工作线程还未启动(不会被interruptIfStarted打断)
- state=0,标识工作线程已经启动,但没有开始处理任务(可能是在等待任务,idle状态)
- state=1,标识worker线程正在执行任务(runWorker方法中,成功获得任务后,通过lock方法将state设置为1)
具体这三种情况分别在什么时候出现会在下面解析提交任务源码的那部分里详细介绍。
/**
* jdk的实现中令Worker继承AbstractQueuedSynchronizer并实现了一个不可重入的锁
* AQS中的state属性含义
* -1:标识工作线程还未启动
* 0:标识工作线程已经启动,但没有开始处理任务(可能是在等待任务,idle状态)
* 1:标识worker线程正在执行任务(runWorker中,成功获得任务后,通过lock方法将state设置为1)
* */
private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
public MyWorker(Runnable firstTask) {
this.firstTask = firstTask;
// newThread可能是null
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(){
acquire(1);
}
public boolean tryLock(){
return tryAcquire(1);
}
public void unlock(){
release(1);
}
public boolean isLocked(){
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
// 三个条件同时满足,才去中断Worker对应的thread
// getState() >= 0,用于过滤还未执行runWorker的,刚入队初始化的Worker
// thread != null,用于过滤掉构造方法中ThreadFactory.newThread返回null的Worker
// !t.isInterrupted(),用于过滤掉那些已经被其它方式中断的Worker线程(比如用户自己去触发中断,提前终止线程池中的任务)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute执行提交的任务
下面介绍本篇博客的重点,即线程池是如何执行用户所提交的任务的。
用户提交任务的入口是public的execute方法,Runnable类型的参数command就是提交的要执行的任务。
MyThreadPoolExecutorV1的execute方法(相比jdk的实现v1版本去掉了关于优雅停止的逻辑)
/**
* 提交任务,并执行
* */
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command参数不能为空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接返回即可
return;
}
}
// 走到这里有两种情况
// 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列
// 2 addWorker返回false,创建核心工作线程失败
if(this.workQueue.offer(command)){
// workQueue.offer入队成功
if(workerCountOf(currentCtl) == 0){
// 在corePoolSize为0的情况下,当前不存在存活的核心线程
// 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务
// 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行
addWorker(null,false);
}else{
// 加入队列成功,且当前存在worker线程,成功返回
return;
}
}else{
// 阻塞队列已满,尝试创建一个新的非核心线程处理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)
reject(command);
}else{
// 创建非核心线程成功,成功返回
return;
}
}
}
/**
* 根据指定的拒绝处理器,执行拒绝策略
* */
private void reject(Runnable command) {
this.handler.rejectedExecution(command, this);
}
可以看到,execute方法源码中对于任务处理的逻辑很清晰,也能与ThreadPoolExecutor运行时工作流程中所介绍的流程所匹配。
addWorker方法(创建新的工作线程)
在execute方法中当需要创建核心线程或普通线程时,便需要通过addWorker方法尝试创建一个新的工作线程。
/**
* 向线程池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry标识外层循环
retry:
for (;;) {
int currentCtl = ctl.get();
// 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)
for (;;) {
// 判断当前worker数量是否超过了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 当前worker数量超过了设计上允许的最大限制
return false;
}
if (core) {
// 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数
if (workerCount >= this.corePoolSize) {
// 超过了核心线程数,创建核心worker线程失败
return false;
}
} else {
// 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数
if (workerCount >= this.maximumPoolSize) {
// 超过了最大线程数,创建非核心worker线程失败
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外层循环
break retry;
}
// compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环
}
}
boolean workerStarted = false;
MyWorker newWorker = null;
try {
// 创建一个新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化时内部线程创建成功
// 加锁,防止并发更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorkerThread.isAlive()) {
// 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果当前worker个数超过了之前记录的最大存活线程数,将其更新
largestPoolSize = workerSize;
}
// 创建成功
} finally {
// 无论是否发生异常,都先将主控锁解锁
mainLock.unlock();
}
// 加入成功,启动worker线程
myWorkerThread.start();
// 标识为worker线程启动成功,并作为返回值返回
workerStarted = true;
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
addWorker可以分为两部分:判断当前是否满足创建新工作线程的条件、创建并启动新的Worker工作线程。
判断当前是否满足创建新工作线程的条件
入口处开始的retry标识的for循环部分,便是用于判断是否满足创建新工作线程的条件。
- 首先判断当前工作线程数量是否超过了理论的最大值CAPACITY(即2^29-1),超过了则不能创建,返回false,不创建新工作线程
- 根据boolean类型参数core判断是否创建核心工作线程,core=true则判断是否超过了corePoolSize的限制,core=false则判断是否超过了maximumPoolSize的限制。不满足则返回false,不创建新工作线程
- 满足上述限制条件后,则说明可以创建新线程了,compareAndIncrementWorkerCount方法进行cas的增加当前工作线程数。
如果cas失败,则说明存在并发的更新了,则再一次的循环重试,并再次的进行上述检查。
需要注意的是:这里面有两个for循环的原因在于v1版本省略了优雅停止的逻辑(所以实际上v1版本能去掉内层循环的)。如果线程池处于停止状态则不能再创建新工作线程了,因此也需要判断线程池当前的状态,
不满足条件则也需要返回false,不创建工作线程。
而且compareAndIncrementWorkerCount中cas更新ctl时,如果并发的线程池被停止而导致线程池状态发生了变化,也会导致cas失败重新检查。
这也是jdk的实现中为什么把线程池状态和工作线程数量绑定在一起的原因之一,这样在cas更新时可以原子性的同时检查两个字段的并发争抢。(更具体的细节会在下一篇博客的v2版本中介绍)
创建并启动新的Worker工作线程
在通过retry那部分的层层条件检查后,紧接着便是实际创建新工作线程的逻辑。
- 首先通过Worker的构造方法创建一个新的Worker对象,并将用户提交的任务作为firstTask参数传入。
- 判断Worker在构造时线程工厂是否正确的生成了一个Thread(判空),如果thread == null的话直接返回false,标识创建新工作线程失败。
- 在mainLock的保护下,将新创建的worker线程加入workers集合中
- 启动Worker中的线程(myWorkerThread.start()),启动后会执行Worker类中的run方法,新的工作线程会执行runWorker方法(下文会展开分析runWorker)
- 如果Worker中的线程不是alive状态等原因导致工作线程启动失败,则在finally中通过addWorkerFailed进行一系列的回滚操作
虽然在前面线程池工作流程的分析中提到了核心线程与非核心线程的概念,但Worker类中实际上并没有核心/非核心的标识。
经过了工作线程启动前的条件判断后,新创建的工作线程实际上并没有真正的核心与非核心的差别。
addWorkerFailed(addWorker的逆向回滚操作)
addWorker中工作线程可能会启动失败,所以要对addWorker中对workers集合以及workerCount等数据的操作进行回滚。
/**
* 当创建worker出现异常失败时,对之前的操作进行回滚
* 1 如果新创建的worker加入了workers集合,将其移除
* 2 减少记录存活的worker个数(cas更新)
* 3 检查线程池是否满足中止的状态,防止这个存活的worker线程阻止线程池的中止(v1版本不考虑,省略了tryTerminate)
*/
private void addWorkerFailed(MyWorker myWorker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorker != null) {
// 如果新创建的worker加入了workers集合,将其移除
workers.remove(myWorker);
}
// 减少存活的worker个数
decrementWorkerCount();
// 尝试着将当前worker线程终止(addWorkerFailed由工作线程自己调用)
// tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker(工作线程核心执行逻辑)
前面介绍了用户如何向线程池提交任务,以及如何创建新工作线程Worker,下面介绍工作线程在线程池中是如何运行的。
- runWorker方法内部本质上是一个无限循环,在进入主循环之前通过unlock方法,将内部AQS父类中的state标识为0,允许被外部中断(可以被interruptIfStarted选中而打断)
- 之后便是主循环,如果firstTask不为空(说明第一次启动),则直接调用task.run方法。否则通过getTask方法尝试从工作队列中捞取一个任务来执行
- 在实际的任务执行前和执行后都调用对应的钩子方法(beforeExecute、afterExecute)
- 在任务执行前通过lock方法将AQS的state方法设置为1代表当前Worker正在执行任务,并在执行完一个任务后在finally中进行unlock解锁,令当前工作线程进入idle状态。
同时清空firstTask的值(清空后下一次循环就会通过getTask获取任务了)并令Worker中的completedTasks统计指标也自增1 - 如果任务执行过程中出现了异常,则catch住并最终向上抛出跳出主循环,finally中执行processWorkerExit(认为任务一旦执行出现了异常,则很可能工作线程内部的一些状态已经损坏,需要重新创建一个新的工作线程来代替出异常的老工作线程)
- 有两种情况会导致执行processWorkerExit,一种是上面说的任务执行时出现了异常,此时completedAbruptly=true;还有一种可能时getTask因为一些原因返回了null,此时completedAbruptly=false。
completedAbruptly会作为processWorkerExit的参数传递。
/**
* worker工作线程主循环执行逻辑
* */
private void runWorker(MyWorker myWorker) {
// 时worker线程的run方法调用的,此时的current线程的是worker线程
Thread workerThread = Thread.currentThread();
Runnable task = myWorker.firstTask;
// 已经暂存了firstTask,将其清空(有地方根据firstTask是否存在来判断工作线程中负责的任务是否是新提交的)
myWorker.firstTask = null;
// 将state由初始化时的-1设置为0
// 标识着此时当前工作线程开始工作了,这样可以被interruptIfStarted选中
myWorker.unlock();
// 默认线程是由于中断退出的
boolean completedAbruptly = true;
try {
// worker线程处理主循环,核心逻辑
while (task != null || (task = getTask()) != null) {
// 将state由0标识为1,代表着其由idle状态变成了正在工作的状态
// 这样interruptIdleWorkers中的tryLock会失败,这样工作状态的线程就不会被该方法中断任务的正常执行
myWorker.lock();
// v1版本此处省略优雅停止相关的核心逻辑
try {
// 任务执行前的钩子函数
beforeExecute(workerThread, task);
Throwable thrown = null;
try {
// 拿到的任务开始执行
task.run();
} catch (RuntimeException | Error x) {
// 使用thrown收集抛出的异常,传递给afterExecute
thrown = x;
// 同时抛出错误,从而中止主循环
throw x;
} catch (Throwable x) {
// 使用thrown收集抛出的异常,传递给afterExecute
thrown = x;
// 同时抛出错误,从而中止主循环
throw new Error(x);
} finally {
// 任务执行后的钩子函数,如果任务执行时抛出了错误/异常,thrown不为null
afterExecute(task, thrown);
}
} finally {
// 将task设置为null,令下一次while循环通过getTask获得新任务
task = null;
// 无论执行时是否存在异常,已完成的任务数加1
myWorker.completedTasks++;
// 无论如何将myWorker解锁,标识为idle状态
myWorker.unlock();
}
}
// getTask返回了null,说明没有可执行的任务或者因为idle超时、线程数超过配置等原因需要回收当前线程。
// 线程正常的退出,completedAbruptly为false
completedAbruptly = false;
}finally {
// getTask返回null,线程正常的退出,completedAbruptly值为false
// task.run()执行时抛出了异常/错误,直接跳出了主循环,此时completedAbruptly为初始化时的默认值true
processWorkerExit(myWorker, completedAbruptly);
// processWorkerExit执行完成后,worker线程对应的run方法(run->runWorker)也会执行完毕
// 此时线程对象会进入终止态,等待操作系统回收
// 而且processWorkerExit方法内将传入的Worker从workers集合中移除,jvm中的对象也会因为不再被引用而被GC回收
// 此时,当前工作线程所占用的所有资源都已释放完毕
}
}
getTask尝试获取任务执行
runWorker中是通过getTask获取任务的,getTask中包含着工作线程是如何从工作队列中获取任务的关键逻辑。
- 在获取任务前,需要通过getTask检查当前线程池的线程数量是否超过了参数配置(启动后被动态调整了),因此需要先获得当前线程池工作线程总数workCount。
如果当前工作线程数量超过了指定的最大线程个数maximumPoolSize限制,则说明当前线程需要退出了 - timed标识用于决定当前线程如何从工作队列(阻塞队列)中获取新任务,如果timed为true则通过poll方法获取同时指定相应的超时时间(配置参数keepAliveTime),如果timed为false则通过take方法无限期的等待。
如果工作队列并不为空,则poll和take方法都会立即返回一个任务对象。而当工作队列为空时,工作线程则会阻塞在工作队列上以让出CPU(idle状态)直到有新的任务到来而被唤醒(或者超时唤醒)。
这也是存储任务的workQueue不能是普通的队列,而必须是阻塞队列的原因。(对阻塞队列工作原理不太清楚的读者可以参考我以前的博客:自己动手实现一个阻塞队列) - timed的值由两方面共同决定。一是配置参数allowCoreThreadTimeOut是否为true,为true的话说明不管是核心线程还是非核心线程都需要在idle等待keepAliveTime后销毁退出。所以allowCoreThreadTimeOut=true,则timed一定为true
二是如果allowCoreThreadTimeOut为false,说明核心线程不需要退出,而非核心线程在idle等待keepAliveTime后需要销毁退出。则判断当前workCount是否大于配置的corePoolSize,是的话则timed为true否则为false。
如果当前线程数超过了指定的最大核心线程数corePoolSize,则需要让工作队列为空时(说明线程池负载较低)部分idle线程退出,使得最终活跃的线程数减少到和corePoolSize一致。
从这里可以看到,核心与非核心线程的概念在ThreadPoolExecutor里是很弱的,不关心工作线程最初是以什么原因创建的都一视同仁,谁都可能被当作非核心线程而销毁退出。 - timedOut标识当前工作线程是否因为poll拉取任务时出现了超时。take永远不会返回null,因此只有poll在超时时会返回null,当poll返回值为null时,表明是等待了keepAliveTime时间后超时了,所以timedOut标识为true。
同时如果拉取任务时线程被中断了,则捕获InterruptedException异常,将timeOut标识为false(被中断的就不认为是超时)。 - 当(workCount > maximumPoolSize)或者 (timed && timedOut)两者满足一个时,就说明当前线程应该要退出了。
此时将当前的workCount用cas的方式减去1,返回null代表获取任务失败即可;如果cas失败,则在for循环中重试。
但有一种情况是例外的(workCount <= 1 && !workQueue.isEmpty()),即当前工作线程数量恰好为1,且工作队列不为空(那么还需要当前线程继续工作把工作队列里的任务都消费掉,无论如何不能退出)
/**
* 尝试着从阻塞队列里获得待执行的任务
* @return 返回null代表工作队列为空,没有需要执行的任务; 或者当前worker线程满足了需要退出的一些条件
* 返回对应的任务
* */
private Runnable getTask() {
boolean timedOut = false;
for(;;) {
int currentCtl = ctl.get();
// 获得当前工作线程个数
int workCount = workerCountOf(currentCtl);
// 有两种情况需要指定超时时间的方式从阻塞队列workQueue中获取任务(即timed为true)
// 1.线程池配置参数allowCoreThreadTimeOut为true,即允许核心线程在idle一定时间后被销毁
// 所以allowCoreThreadTimeOut为true时,需要令timed为true,这样可以让核心线程也在一定时间内获取不到任务(idle状态)而被销毁
// 2.线程池配置参数allowCoreThreadTimeOut为false,但当前线程池中的线程数量workCount大于了指定的核心线程数量corePoolSize
// 说明当前有一些非核心的线程正在工作,而非核心的线程在idle状态一段时间后需要被销毁
// 所以此时也令timed为true,让这些线程在keepAliveTime时间内由于队列为空拉取不到任务而返回null,将其销毁
boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
// 有共四种情况不需要往下执行,代表
// 1 (workCount > maximumPoolSize && workCount > 1)
// 当前工作线程个数大于了指定的maximumPoolSize(可能是由于启动后通过setMaximumPoolSize调小了maximumPoolSize的值)
// 已经不符合线程池的配置参数约束了,要将多余的工作线程回收掉
// 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收
// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
// 当前工作线程个数大于了指定的maximumPoolSize(maximumPoolSize被设置为0了)
// 已经不符合线程池的配置参数约束了,要将多余的工作线程回收掉
// 但此时workCount<=1,说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了
// 所以即使maximumPoolSize设置为0,也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务
// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,说明timed判定命中,需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true
// 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程
// 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收
// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,说明timed判定命中,需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true
// 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程
// 但此时workCount<=1,说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了
// 所以即使timed && timedOut超时逻辑匹配,也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务
if ((workCount > maximumPoolSize || (timed && timedOut))
&& (workCount > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(currentCtl)) {
// 满足上述条件,说明当前线程需要被销毁了,返回null
return null;
}
// compareAndDecrementWorkerCount方法由于并发的原因cas执行失败,continue循环重试
continue;
}
try {
// 根据上面的逻辑的timed标识,决定以什么方式从阻塞队列中获取任务
Runnable r = timed ?
// timed为true,通过poll方法指定获取任务的超时时间(如果指定时间内没有队列依然为空,则返回)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed为false,通过take方法无限期的等待阻塞队列中加入新的任务
workQueue.take();
if (r != null) {
// 获得了新的任务,getWork正常返回对应的任务对象
return r;
}else{
// 否则说明timed=true,且poll拉取任务时超时了
timedOut = true;
}
} catch (InterruptedException retry) {
// poll or take任务等待时worker线程被中断了,捕获中断异常
// timeout = false,标识拉取任务时没有超时
timedOut = false;
}
}
}
processWorkerExit(处理工作线程退出)
在runWorker中,如果getTask方法没有拿到任务返回了null或者任务在执行时抛出了异常就会在最终的finally块中调用processWorkerExit方法,令当前工作线程销毁退出。
- processWorkerExit方法内会将当前线程占用的一些资源做清理,比如从workers中移除掉当前线程(利于Worker对象的GC),并令当前线程workerCount减一(completedAbruptly=true,说明是中断导致的退出,getTask中没来得及减workerCount,在这里补正)
- completedAbruptly=true,说明是runWorker中任务异常导致的线程退出,无条件的通过addWorker重新创建一个新的工作线程代替当前退出的工作线程。
- completedAbruptly=false,在退出当前工作线程后,需要判断一下退出后当前所存活的工作线程数量是否满足要求。
比如allowCoreThreadTimeOut=false时,当前工作线程个数是否不低于corePoolSize等,如果不满足要求则通过addWorker重新创建一个新的线程。
工作线程退出时所占用资源的回收
- processWorkerExit方法执行完毕后,当前工作线程就完整的从当前线程池中退出了(workers中没有了引用,workerCount减1了),GC便会将内存中的Worker对象所占用的内存给回收掉。
- 同时runWorker中最后执行完processWorkerExit后,工作线程的run方法也return了,标识着整个线程正常退出了,操作系统层面上也会将线程转为终止态并最终回收。至此,线程占用的所有资源就被彻底的回收干净了。
/**
* 处理worker线程退出
* @param myWorker 需要退出的工作线程对象
* @param completedAbruptly 是否是因为中断异常的原因,而需要回收
* */
private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
if (completedAbruptly) {
// 如果completedAbruptly=true,说明是任务在run方法执行时出错导致的线程退出
// 而正常退出时completedAbruptly=false,在getTask中已经将workerCount的值减少了
decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程池全局总完成任务数累加上要退出的工作线程已完成的任务数
this.completedTaskCount += myWorker.completedTasks;
// workers集合中将当前工作线程剔除
workers.remove(myWorker);
// completedTaskCount是long类型的,workers是HashSet,
// 都是非线程安全的,所以在mainLock的保护进行修改
} finally {
mainLock.unlock();
}
int currentCtl = this.ctl.get();
if (!completedAbruptly) {
// completedAbruptly=false,说明不是因为中断异常而退出的
// min标识当前线程池允许的最小线程数量
// 1 如果allowCoreThreadTimeOut为true,则核心线程也可以被销毁,min=0
// 2 如果allowCoreThreadTimeOut为false,则min应该为所允许的核心线程个数,min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
// 如果min为0了,但工作队列不为空,则修正min=1,因为至少需要一个工作线程来将工作队列中的任务消费、处理掉
min = 1;
}
if (workerCountOf(currentCtl) >= min) {
// 如果当前工作线程数大于了min,当前线程数量是足够的,直接返回(否则要执行下面的addWorker恢复)
return;
}
}
// 两种场景会走到这里进行addWorker操作
// 1 completedAbruptly=true,说明线程是因为中断异常而退出的,需要重新创建一个新的工作线程
// 2 completedAbruptly=false,且上面的workerCount<min,则说明当前工作线程数不够,需要创建一个
// 为什么参数core传的是false呢?
// 因为completedAbruptly=true而中断退出的线程,无论当前工作线程数是否大于核心线程,都需要创建一个新的线程来代替原有的被退出的线程
addWorker(null, false);
}
动态修改配置参数
ThreadPoolExecutor除了支持启动前通过构造函数设置配置参数外,也允许在线程池运行的过程中动态的更改配置。而要实现动态的修改配置,麻烦程度要比启动前静态的指定大得多。
举个例子,在线程池的运行过程中如果当前corePoolSize=20,且已经创建了20个核心线程时(workerCount=20),现在将corePoolSize减少为10或者增大为30时应该如何实时的生效呢?
下面通过内嵌于代码中的注释,详细的说明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize这三个关键配置参数实现动态修改的原理。
/**
* 设置是否允许核心线程idle超时后退出
* */
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 判断一下新旧值是否相等,避免无意义的volatile变量更新,导致不必要的cpu cache同步
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
// 参数值value为true,说明之前不允许核心线程由于idle超时而退出
// 而此时更新为true说明现在允许了,则通过interruptIdleWorkers唤醒所有的idle线程
// 令其走一遍runWorker中的逻辑,尝试着让idle超时的核心线程及时销毁
interruptIdleWorkers();
}
}
}
/**
* 动态更新核心线程最大值corePoolSize
* */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException();
}
// 计算差异
int delta = corePoolSize - this.corePoolSize;
// 赋值
this.corePoolSize = corePoolSize;
if (workerCountOf(this.ctl.get()) > corePoolSize) {
// 更新完毕后,发现当前工作线程数超过了指定的值
// 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁
interruptIdleWorkers();
} else if (delta > 0) {
// 差异大于0,代表着新值大于旧值
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 我们无法确切的知道有多少新的线程是所需要的。
// 启发式的预先启动足够的新工作线程用于处理工作队列中的任务
// 但当执行此操作时工作队列为空了,则立即停止此操作(队列为空了说明当前负载较低,再创建更多的工作线程是浪费资源)
// 取差异和当前工作队列中的最小值为k
int k = Math.min(delta, workQueue.size());
// 尝试着一直增加新的工作线程,直到和k相同
// 这样设计的目的在于控制增加的核心线程数量,不要一下子创建过多核心线程
// 举个例子:原来的corePoolSize是10,且工作线程数也是10,现在新值设置为了30,新值比旧值大20,理论上应该直接创建20个核心工作线程
// 而工作队列中的任务数只有10,那么这个时候直接创建20个新工作线程是没必要的,只需要一个一个创建,在创建的过程中新的线程会尽量的消费工作队列中的任务
// 这样就可以以一种启发性的方式创建合适的新工作线程,一定程度上节约资源。后面再有新的任务提交时,再从runWorker方法中去单独创建核心线程(类似惰性创建)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) {
// 其它工作线程在循环的过程中也在消费工作线程,且用户也可能不断地提交任务
// 这是一个动态的过程,但一旦发现当前工作队列为空则立即结束
break;
}
}
}
}
/**
* 动态更新最大线程数maximumPoolSize
* */
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
// 更新完毕后,发现当前工作线程数超过了指定的值
// 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁
interruptIdleWorkers();
}
}
目前为止,通过v1版本的MyThreadPoolExecutor源码,已经将jdk线程池ThreadPoolExecutor在RUNNING状态下提交任务,启动工作线程执行任务相关的核心逻辑讲解完毕了(不考虑优雅停止)。
jdk线程池默认支持的四种拒绝策略
jdk线程池支持用户传入自定义的拒绝策略处理器,只需要传入实现了RejectedExecutionHandler接口的对象就行。
而jdk在ThreadPoolExecutor中提供了默认的四种拒绝策略方便用户使用。
- AbortPolicy
拒绝接受任务时会抛出RejectedExecutionException,能让提交任务的一方感知到异常的策略。适用于大多数场景,也是jdk默认的拒绝策略。 - DiscardPolicy
直接丢弃任务的拒绝策略。简单的直接丢弃任务,适用于对任务执行成功率要求不高的场合 - DiscardOldestPolicy
丢弃当前工作队列中最早入队的任务,然后将当前任务重新提交。适用于后出现的任务能够完全代替之前任务的场合(追求最终一致性) - CallerRunsPolicy
令调用者线程自己执行所提交任务的拒绝策略。在线程池压力过大时,让提交任务的线程自己执行该任务(异步变同步),能有效地降低线程池的压力,也不会丢失任务,但可能导致整体业务吞吐量大幅降低。
上面介绍的四种jdk默认拒绝策略分别适应不同的业务场景,需要用户仔细考虑最适合的拒绝策略。同时灵活的、基于接口的设计也开放的支持用户去自己实现更贴合自己业务的拒绝策略处理器。
/**
* 默认的拒绝策略:AbortPolicy
* */
private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
/**
* 抛出RejectedExecutionException的拒绝策略
* 评价:能让提交任务的一方感知到异常的策略,比较通用,也是jdk默认的拒绝策略
* */
public static class MyAbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 直接抛出异常
throw new RejectedExecutionException("Task " + command.toString() +
" rejected from " + executor.toString());
}
}
/**
* 令调用者线程自己执行command任务的拒绝策略
* 评价:在线程池压力过大时,让提交任务的线程自己执行该任务(异步变同步),
* 能够有效地降低线程池的压力,也不会丢失任务,但可能导致整体业务吞吐量大幅降低
* */
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果当前线程池不是shutdown状态,则令调用者线程自己执行command任务
command.run();
}else{
// 如果已经是shutdown状态了,就什么也不做直接丢弃任务
}
}
}
/**
* 直接丢弃任务的拒绝策略
* 评价:简单的直接丢弃任务,适用于对任务执行成功率要求不高的场合
* */
public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 什么也不做的,直接返回
// 效果就是command任务被无声无息的丢弃了,没有异常
}
}
/**
* 丢弃当前工作队列中最早入队的任务,然后将当前任务重新提交
* 评价:适用于后出现的任务能够完全代替之前任务的场合(追求最终一致性)
* */
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果当前线程池不是shutdown状态,则丢弃当前工作队列中最早入队的任务,然后将当前任务重新提交
executor.getQueue().poll();
executor.execute(command);
}else{
// 如果已经是shutdown状态了,就什么也不做直接丢弃任务
}
}
}
jdk默认的四种线程池实现
jdk中除了提供了默认的拒绝策略,还在Executors类中提供了四种基于ThreadPoolExecutor的、比较常用的线程池,以简化用户对线程池的使用。
这四种线程池可以通过Executors提供的public方法来分别创建:
newFixedThreadPool
newFixedThreadPool方法创建一个工作线程数量固定的线程池,其创建ThreadPoolExecutor时传入的核心线程数corePoolSize和最大线程数maximumPoolSize是相等的。
因此其工作队列传入是一个无界的LinkedBlockingQueue,无界的工作队列意味着永远都不会创建新的非核心线程。
在默认allowCoreThreadTimeOut为false的情况下,线程池中的所有线程都是不会因为idle超时而销毁的核心线程。
适用场景:由于工作线程数量固定,“fixedThreadPool”适用于任务流量较为稳定的场景
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool
newCachedThreadPool方法创建一个工作线程数量有巨大弹性的线程池,其核心线程数corePoolSize=0而最大线程数maximumPoolSize为Integer.MAX_VALUE,60s的保活时间。
同时其工作队列是SynchronousQueue,是一种队列容量为0、无法缓存任何任务的阻塞队列(任何时候插入数据(offer)时必须有消费者线程消费,否则生产者线程将会被阻塞)。
这也意味着“cachedThreadPool”中没有核心线程,所有工作线程在任务负载较低时都会在60s的idle后被销毁;同时当负载较高,新任务到来时由于所有的工作线程都在执行其它任务,将会立即创建一个新的非核心线程来处理任务。
适用场景:由于可以无限制的创建新线程来做到及时响应任务,“cachedThreadPool”适用于任务流量较大且不稳定,对任务延迟容忍度较低的场景
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
newSingleThreadExecutor方法创建一个单线程的线程池,其核心线程数corePoolSize=1且最大线程数maximumPoolSize也为1,其工作队列是无界队列。
这意味着“singleThreadExecutor”中任何提交的任务都将严格按照先入先出的顺序被执行。
适用场景:“singleThreadExecutor”适用于任务量较小、对任务延迟容忍度较高、并要求任务顺序执行的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
newScheduledThreadPool方法创建一个支持定时任务、延迟任务执行的线程池(关于jdk定时任务线程池ScheduledThreadPoolExecutor的工作原理会在未来的博客中展开)
适用场景:“scheduledThreadPool”适用于需要任务定时或者延迟执行的场景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
jdk默认提供的线程池的缺陷
- 无论是newCachedThreadPool还是newFixedThreadPool、newSingleThreadExecutor,其设置的最大线程数量(Integer.MAX_VALUE)和无界的工作队列(new LinkedBlockingQueue
())都缺乏必要的限制。
在生产环境中很容易因为任务流量过大导致创建过多的工作线程或令无界的工作队列堆积大量的任务对象而耗尽CPU和内存等系统资源,最终导致程序崩溃。
这也是为什么阿里巴巴的开发规范中推荐使用更基础的ThreadPoolExecutor构造函数来创建所需要的线程池。 - 只有在了解ThreadPoolExecutor工作原理以及各项配置参数的具体作用后,才能根据具体的业务和硬件配置来设置最合适的参数值。
总结
- 这篇博客中我们首先介绍了线程池的基本概念,随后在源码层面解析了jdk默认的线程池ThreadPoolExecutor在执行所提交任务的整体工作原理(RUNNING状态),
并在最后简单的分析了jdk默认提供的四种拒绝策略和四种线程池的适用场景。 - 希望通过这篇博客能让读者更好的理解线程池的工作原理,并在工作中更好的使用线程池。关于ThreadPoolExecutor优雅停止的原理会在下一篇博客中进行详细的分析,尽请期待。
- 本篇博客的完整代码在我的github上://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模块) 内容如有错误,还请多多指教。