jdk執行緒池ThreadPoolExecutor工作原理解析(自己動手實現執行緒池)(一)
jdk執行緒池ThreadPoolExecutor工作原理解析(自己動手實現執行緒池)(一)
執行緒池介紹
在日常開發中經常會遇到需要使用其它執行緒將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用執行緒的過程中卻存在著兩個痛點。
- 在java等很多主流語言中每個邏輯上的執行緒底層都對應著一個系統執行緒(不考慮虛擬執行緒的情況)。作業系統創建一個新執行緒是存在一定開銷的,
在需要執行大量的非同步任務時,如果處理每個任務時都直接向系統申請創建一個執行緒來執行,並在任務執行完畢後再回收執行緒,則創建/銷毀大量執行緒的開銷將無法忍受。 - 每個系統執行緒都會佔用一定的記憶體空間,且系統在調度不同執行緒上下文切換時存在一定的cpu開銷。因此在一定的硬體條件下,作業系統能同時維護的系統執行緒個數相對而言是比較有限的。
在使用執行緒的過程中如果沒有控制好流量,會很容易創建過多的執行緒而耗盡系統資源,令系統變得不可用。
而執行緒池正是為解決上述痛點而生的,其通過兩個手段來解決上述痛點。
池化執行緒資源
池化執行緒資源,顧名思義就是維護一個存活執行緒的集合(池子)。提交任務的用戶程式不直接控制執行緒的創建和銷毀,不用每次執行任務時都申請創建一個新執行緒,而是通過執行緒池間接的獲得執行緒去處理非同步任務。
執行緒池中的執行緒在執行完任務後通常也不會被系統回收掉,而是繼續待在池子中用於執行其它的任務(執行堆積的待執行任務或是等待新任務)。
執行緒池通過池化執行緒資源,避免了系統反覆創建/銷毀執行緒的開銷,大幅提高了處理大規模非同步任務時的性能。
對執行緒資源的申請進行收口,限制系統資源的使用
如果程式都統一使用執行緒池來處理非同步任務,則執行緒池內部便可以對系統資源的使用施加一定限制。
例如用戶可以指定一個執行緒池最大可維護的執行緒數量,以避免耗盡系統資源。
當用戶提交任務的速率過大,導致執行緒池中的執行緒數到達指定的最大值時依然無法滿足需求時,執行緒池可以通過丟棄部分任務或限制提交任務的流量的方式來處理這一問題。
執行緒池通過對執行緒資源的使用進行統一收口,用戶可以通過設置執行緒池的參數來控制系統資源的使用,從而避免系統資源耗盡。
jdk執行緒池ThreadPoolExecutor簡單介紹
前面介紹了執行緒池的概念,而要深入理解執行緒池的工作原理最好的辦法便是找到一個優秀的執行緒池實現來加以研究。
而自jdk1.5中引入的通用執行緒池框架ThreadPoolExecutor便是一個很好的學習對象。其內部實現不算複雜,卻在高效實現核心功能的同時還提供了較豐富的拓展能力。
下面從整體上介紹一下jdk通用執行緒池ThreadPoolExecutor的工作原理(基於jdk8)。
ThreadPoolExecutor運行時工作流程
首先ThreadPoolExecutor允許用戶從兩個不同維度來控制執行緒資源的使用,即最大核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize)。
最大核心執行緒數:核心執行緒指的是通常常駐執行緒池的執行緒。常駐執行緒在執行緒池沒有任務空閑時也不會被銷毀,而是處於idle狀態,這樣在新任務到來時就能很快的進行響應。
最大執行緒數:和第一節中提到的一樣,即執行緒池中所能允許的活躍執行緒的最大數量。
在向ThreadPoolExecutor提交任務時(execute方法),會執行一系列的判斷來決定任務應該如何被執行(源碼在下一節中具體分析)。
- 首先判斷當前活躍的執行緒數是否小於指定的最大核心執行緒數corePoolSize。
如果為真,則說明當前執行緒池還未完成預熱,核心執行緒數不飽和,創建一個新執行緒來執行該任務。
如果為假,則說明當前執行緒池已完成預熱,進行下一步判斷。 - 嘗試將當前任務放入工作隊列workQueue(阻塞隊列BlockingQueue),工作隊列中的任務會被執行緒池中的活躍執行緒按入隊順序逐個消費。
如果入隊成功,則說明當前工作隊列未滿,入隊的任務將會被執行緒池中的某個活躍執行緒所消費並執行。
如果入隊失敗,則說明當前工作隊列已飽和,執行緒池消費任務的速度可能太慢了,可能需要創建更多新執行緒來加速消費,進行下一步判斷。 - 判斷當前活躍的執行緒數是否小於指定的最大執行緒數maximumPoolSize。
如果為真,則說明當前執行緒池所承載的執行緒數還未達到參數指定的上限,還有餘量來創建新的執行緒加速消費,創建一個新執行緒來執行該任務。
如果為假,則說明當前執行緒池所承載的執行緒數達到了上限,但處理任務的速度依然不夠快,需要觸發拒絕策略。
ThreadPoolExecutor優雅停止
執行緒池的優雅停止一般要能做到以下幾點:
- 執行緒池在中止後不能再受理新的任務
- 執行緒池中止的過程中,已經提交的現存任務不能丟失(等待剩餘任務執行完再關閉或者能夠把剩餘的任務吐出來還給用戶)
- 執行緒池最終關閉前,確保創建的所有工作執行緒都已退出,不會出現資源的泄露
執行緒池自啟動後便會有大量的工作執行緒在內部持續不斷並發的執行提交的各種任務,而要想做到優雅停止並不是一件容易的事情。
因此ThreadPoolExecutor中最複雜、細節最多的部分並不在於上文中的正常工作流程,而在於分散在各個地方但又緊密協作的,控制優雅停止的邏輯。
ThreadPoolExecutor的其它功能
除了正常的工作流程以及優雅停止的功能外,ThreadPoolExecutor還提供了一些比較好用的功能
- 提供了很多protected修飾的鉤子函數,便於用戶繼承並實現自己的執行緒池時進行一定的拓展
- 在運行時統計了總共執行的任務數等關鍵指標,並提供了對應的api便於用戶在運行時觀察運行狀態
- 允許在執行緒池運行過程中動態修改關鍵的配置參數(比如corePoolSize等),並實時的生效。
jdk執行緒池ThreadPoolExecutor源碼解析(自己動手實現執行緒池v1版本)
如費曼所說:What I can not create I do not understand(我不能理解我創造不了的東西)。
通過模仿jdk的ThreadPoolExecutor實現,從零開始實現一個執行緒池,可以迫使自己去仔細的捋清楚jdk執行緒池中設計的各種細節,加深理解而達到更好的學習效果。
前面提到ThreadPoolExecutor的核心邏輯主要分為兩部分,一是正常運行時處理提交的任務的邏輯,二是實現優雅停止的邏輯。
因此我們實現的執行緒池MyThreadPoolExecutor(以My開頭用於區分)也會分為兩個版本,v1版本只實現前一部分即正常運行時執行任務的邏輯,將有關執行緒池優雅停止的邏輯全部去除。
相比直接啃jdk最終實現的源碼,v1版本的實現會更簡單更易理解,讓正常執行任務時的邏輯更加清晰而不會耦合太多關於優雅停止的邏輯。
執行緒池關鍵成員變數介紹
ThreadPoolExecutor中有許多的成員變數,大致可以分為三類。
可由用戶自定義的、用於控制執行緒池運行的配置參數
- volatile int corePoolSize(最大核心執行緒數量)
- volatile int maximumPoolSize(最大執行緒數量)
- volatile long keepAliveTime(idle執行緒保活時間)
- final BlockingQueue workQueue(工作隊列(阻塞隊列))
- volatile ThreadFactory threadFactory(工作執行緒工廠)
- volatile RejectedExecutionHandler handler(拒絕異常處理器)
- volatile boolean allowCoreThreadTimeOut(是否允許核心執行緒在idle超時後退出)
其中前6個配置參數都可以在ThreadPoolExecutor的構造函數中指定,而allowCoreThreadTimeOut則可以通過暴露的public方法allowCoreThreadTimeOut來動態的設置。
其中大部分屬性都是volatile修飾的,目的是讓運行過程中可以用過提供的public方法動態修改這些值後,執行緒池中的工作執行緒或提交任務的用戶執行緒能及時的感知到變化(執行緒間的可見性),並進行響應(比如令核心執行緒自動的idle退出)
這些配置屬性具體如何控制執行緒池行為的原理都會在下面的源碼解析中展開介紹。理解這些參數的工作原理後才能在實際的業務中使用執行緒池時為其設置合適的值。
僅供執行緒池內部工作時使用的屬性
- ReentrantLock mainLock(用於控制各種臨界區邏輯的並發)
- HashSet
workers(當前活躍工作執行緒Worker的集合,工作執行緒的工作原理會在下文介紹) - AtomicInteger ctl(執行緒池控制狀態,control的簡寫)
這裡重點介紹一下ctl屬性。ctl雖然是一個32位的整型欄位(AtomicInteger),但實際上卻用於標識兩個業務屬性,即當前執行緒池的運行狀態和worker執行緒的總數量。
在執行緒池初始化時狀態位RUNNING,worker執行緒數量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位中的高3位用於標識執行緒池當前的狀態,剩餘的29位用於標識執行緒池中worker執行緒的數量(因此理論上ThreadPoolExecutor最大可容納的執行緒數並不是231-1(32位中符號要佔一位),而是229-1)
由於聚合之後單獨的讀寫某一個屬性不是很方便,所以ThreadPoolExecutor中提供了很多基於位運算的輔助函數來簡化這些邏輯。
ctl這樣聚合的設計比起拆分成兩個獨立的欄位有什麼好處?
在ThreadPoolExecutor中關於優雅停止的邏輯中有很多地方是需要同時判斷當前工作執行緒數量與執行緒池狀態後,再對執行緒池狀態工作執行緒數量進行更新的(具體邏輯在下一篇v2版本的部落格中展開)。
且為了執行效率,不使用互斥鎖而是通過cas重試的方法來解決並發更新的問題。而對一個AtomicInteger屬性做cas重試的更新,要比同時控制兩個屬性進行cas的更新要簡單很多,執行效率也高很多。
ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING)。
但由於v1版本的MyThreadPoolExecutorV1不支援優雅停止,所以不在本篇部落格中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的部落格中展開)
記錄執行緒池運行過程中的一些關鍵指標
- completedTaskCount(執行緒池自啟動後已完成的總任務數)
- largestPoolSize(執行緒池自啟動後工作執行緒個數的最大值)
在運行過程中,ThreadPoolExecutor會在對應的地方進行埋點,統計一些指標並提供相應的api給用戶實時的查詢,以提高執行緒池工作時的可觀測性。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
/**
* 指定的最大核心執行緒數量
* */
private volatile int corePoolSize;
/**
* 指定的最大執行緒數量
* */
private volatile int maximumPoolSize;
/**
* 執行緒保活時間(單位:納秒 nanos)
* */
private volatile long keepAliveTime;
/**
* 存放任務的工作隊列(阻塞隊列)
* */
private final BlockingQueue<Runnable> workQueue;
/**
* 執行緒工廠
* */
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略
* */
private volatile MyRejectedExecutionHandler handler;
/**
* 是否允許核心執行緒在idle一定時間後被銷毀(和非核心執行緒一樣)
* */
private volatile boolean allowCoreThreadTimeOut;
/**
* 主控鎖
* */
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 當前執行緒池已完成的任務數量
* */
private long completedTaskCount;
/**
* 維護當前存活的worker執行緒集合
* */
private final HashSet<MyWorker> workers = new HashSet<>();
/**
* 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
* v1版本只關心前者,即worker執行緒數量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前工作執行緒個數的部分就只能用29位了
* 被佔去的3位中,有1位原來的符號位,2位是原來的數值位。
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 執行緒池狀態poolStatus常量(狀態值只會由小到大,單調遞增)
* 執行緒池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著執行緒池處於正常運行的狀態。能正常的接收並處理提交的任務
* 執行緒池對象初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作隊列中積壓的任務處理完
* 調用了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作隊列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
* 之前未調用shutdown方法,直接調用了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先調用了shutdown方法,後調用了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作
* 當前執行緒池狀態為SHUTDOWN,任務被消費完工作隊列workQueue為空,且工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前執行緒池狀態為STOP,工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著執行緒池完全的關閉。之前執行緒池已經處於TIDYING狀態,且調用的鉤子函數terminated已返回
* 當前執行緒池狀態為TIDYING,調用的鉤子函數terminated已返回
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 跟蹤執行緒池曾經有過的最大執行緒數量(只能在mainLock的並發保護下更新)
*/
private int largestPoolSize;
private boolean compareAndIncrementWorkerCount(int expect) {
return this.ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {
// cas更新,workerCount自減1
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
public MyThreadPoolExecutorV1(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
MyRejectedExecutionHandler handler) {
// 基本的參數校驗
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (unit == null || workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
// 設置成員變數
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
Worker工作執行緒
ThreadPoolExecutor中的工作執行緒並不是裸的Thread,而是被封裝在了一個Worker的內部類中。
Worker實現了Runnable所以可以作為一個普通的執行緒來啟動,在run方法中只是簡單的調用了一下runWorker(runWorker後面再展開)。
Worker類有三個成員屬性:
- Thread thread(被封裝的工作執行緒對象)
- Runnable firstTask(提交任務時,創建新Worker對象時指定的第一次要執行的任務(後續執行緒就會去拉取工作隊列里的任務執行了))
- volatile long completedTasks(統計用,計算當前工作執行緒總共完成了多少個任務)
Worker內封裝的實際的工作執行緒對象thread,其在構造函數中由執行緒池的執行緒工廠threadFactory生成,傳入this,所以thread在start後,便會調用run方法進而執行runWorker。
執行緒工廠可以由用戶在創建執行緒池時通過參數指定,因此用戶在自由控制所生成的工作執行緒的同時,也需要保證newThread能正確的返回一個可用的執行緒對象。
除此之外,Worker對象還繼承了AbstractQueuedSynchronizer(AQS)類,簡單的實現了一個不可重入的互斥鎖。
對AQS互斥模式不太了解的讀者可以參考一下我之前關於AQS互斥模式的部落格:AQS互斥模式與ReentrantLock可重入鎖原理解析
AQS中維護了一個volatile修飾的int類型的成員變數state,其具體的含義可以由使用者自己定義。
在Worker中,state的值有三種狀態:
- state=-1,標識工作執行緒還未啟動(不會被interruptIfStarted打斷)
- state=0,標識工作執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
- state=1,標識worker執行緒正在執行任務(runWorker方法中,成功獲得任務後,通過lock方法將state設置為1)
具體這三種情況分別在什麼時候出現會在下面解析提交任務源碼的那部分里詳細介紹。
/**
* jdk的實現中令Worker繼承AbstractQueuedSynchronizer並實現了一個不可重入的鎖
* AQS中的state屬性含義
* -1:標識工作執行緒還未啟動
* 0:標識工作執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
* 1:標識worker執行緒正在執行任務(runWorker中,成功獲得任務後,通過lock方法將state設置為1)
* */
private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
public MyWorker(Runnable firstTask) {
this.firstTask = firstTask;
// newThread可能是null
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(){
acquire(1);
}
public boolean tryLock(){
return tryAcquire(1);
}
public void unlock(){
release(1);
}
public boolean isLocked(){
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
// 三個條件同時滿足,才去中斷Worker對應的thread
// getState() >= 0,用於過濾還未執行runWorker的,剛入隊初始化的Worker
// thread != null,用於過濾掉構造方法中ThreadFactory.newThread返回null的Worker
// !t.isInterrupted(),用於過濾掉那些已經被其它方式中斷的Worker執行緒(比如用戶自己去觸發中斷,提前終止執行緒池中的任務)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute執行提交的任務
下面介紹本篇部落格的重點,即執行緒池是如何執行用戶所提交的任務的。
用戶提交任務的入口是public的execute方法,Runnable類型的參數command就是提交的要執行的任務。
MyThreadPoolExecutorV1的execute方法(相比jdk的實現v1版本去掉了關於優雅停止的邏輯)
/**
* 提交任務,並執行
* */
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command參數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker執行緒數量低於指定的核心執行緒數量,則創建新的核心執行緒
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接返回即可
return;
}
}
// 走到這裡有兩種情況
// 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞隊列
// 2 addWorker返回false,創建核心工作執行緒失敗
if(this.workQueue.offer(command)){
// workQueue.offer入隊成功
if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
// 一個任務在入隊之後,如果當前執行緒池中一個執行緒都沒有,則需要兜底的創建一個非核心執行緒來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊後創建執行緒去拉取任務並執行
addWorker(null,false);
}else{
// 加入隊列成功,且當前存在worker執行緒,成功返回
return;
}
}else{
// 阻塞隊列已滿,嘗試創建一個新的非核心執行緒處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 創建非核心執行緒失敗,執行拒絕策略(失敗的原因和前面創建核心執行緒addWorker的原因類似)
reject(command);
}else{
// 創建非核心執行緒成功,成功返回
return;
}
}
}
/**
* 根據指定的拒絕處理器,執行拒絕策略
* */
private void reject(Runnable command) {
this.handler.rejectedExecution(command, this);
}
可以看到,execute方法源碼中對於任務處理的邏輯很清晰,也能與ThreadPoolExecutor運行時工作流程中所介紹的流程所匹配。
addWorker方法(創建新的工作執行緒)
在execute方法中當需要創建核心執行緒或普通執行緒時,便需要通過addWorker方法嘗試創建一個新的工作執行緒。
/**
* 向執行緒池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層循環
retry:
for (;;) {
int currentCtl = ctl.get();
// 用於cas更新workerCount的內層循環(注意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 創建的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
if (workerCount >= this.corePoolSize) {
// 超過了核心執行緒數,創建核心worker執行緒失敗
return false;
}
} else {
// 創建的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大執行緒數,創建非核心worker執行緒失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層循環
break retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層循環
}
}
boolean workerStarted = false;
MyWorker newWorker = null;
try {
// 創建一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部執行緒創建成功
// 加鎖,防止並發更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorkerThread.isAlive()) {
// 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
largestPoolSize = workerSize;
}
// 創建成功
} finally {
// 無論是否發生異常,都先將主控鎖解鎖
mainLock.unlock();
}
// 加入成功,啟動worker執行緒
myWorkerThread.start();
// 標識為worker執行緒啟動成功,並作為返回值返回
workerStarted = true;
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
addWorker可以分為兩部分:判斷當前是否滿足創建新工作執行緒的條件、創建並啟動新的Worker工作執行緒。
判斷當前是否滿足創建新工作執行緒的條件
入口處開始的retry標識的for循環部分,便是用於判斷是否滿足創建新工作執行緒的條件。
- 首先判斷當前工作執行緒數量是否超過了理論的最大值CAPACITY(即2^29-1),超過了則不能創建,返回false,不創建新工作執行緒
- 根據boolean類型參數core判斷是否創建核心工作執行緒,core=true則判斷是否超過了corePoolSize的限制,core=false則判斷是否超過了maximumPoolSize的限制。不滿足則返回false,不創建新工作執行緒
- 滿足上述限制條件後,則說明可以創建新執行緒了,compareAndIncrementWorkerCount方法進行cas的增加當前工作執行緒數。
如果cas失敗,則說明存在並發的更新了,則再一次的循環重試,並再次的進行上述檢查。
需要注意的是:這裡面有兩個for循環的原因在於v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層循環的)。如果執行緒池處於停止狀態則不能再創建新工作執行緒了,因此也需要判斷執行緒池當前的狀態,
不滿足條件則也需要返回false,不創建工作執行緒。
而且compareAndIncrementWorkerCount中cas更新ctl時,如果並發的執行緒池被停止而導致執行緒池狀態發生了變化,也會導致cas失敗重新檢查。
這也是jdk的實現中為什麼把執行緒池狀態和工作執行緒數量綁定在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個欄位的並發爭搶。(更具體的細節會在下一篇部落格的v2版本中介紹)
創建並啟動新的Worker工作執行緒
在通過retry那部分的層層條件檢查後,緊接著便是實際創建新工作執行緒的邏輯。
- 首先通過Worker的構造方法創建一個新的Worker對象,並將用戶提交的任務作為firstTask參數傳入。
- 判斷Worker在構造時執行緒工廠是否正確的生成了一個Thread(判空),如果thread == null的話直接返回false,標識創建新工作執行緒失敗。
- 在mainLock的保護下,將新創建的worker執行緒加入workers集合中
- 啟動Worker中的執行緒(myWorkerThread.start()),啟動後會執行Worker類中的run方法,新的工作執行緒會執行runWorker方法(下文會展開分析runWorker)
- 如果Worker中的執行緒不是alive狀態等原因導致工作執行緒啟動失敗,則在finally中通過addWorkerFailed進行一系列的回滾操作
雖然在前面執行緒池工作流程的分析中提到了核心執行緒與非核心執行緒的概念,但Worker類中實際上並沒有核心/非核心的標識。
經過了工作執行緒啟動前的條件判斷後,新創建的工作執行緒實際上並沒有真正的核心與非核心的差別。
addWorkerFailed(addWorker的逆向回滾操作)
addWorker中工作執行緒可能會啟動失敗,所以要對addWorker中對workers集合以及workerCount等數據的操作進行回滾。
/**
* 當創建worker出現異常失敗時,對之前的操作進行回滾
* 1 如果新創建的worker加入了workers集合,將其移除
* 2 減少記錄存活的worker個數(cas更新)
* 3 檢查執行緒池是否滿足中止的狀態,防止這個存活的worker執行緒阻止執行緒池的中止(v1版本不考慮,省略了tryTerminate)
*/
private void addWorkerFailed(MyWorker myWorker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorker != null) {
// 如果新創建的worker加入了workers集合,將其移除
workers.remove(myWorker);
}
// 減少存活的worker個數
decrementWorkerCount();
// 嘗試著將當前worker執行緒終止(addWorkerFailed由工作執行緒自己調用)
// tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker(工作執行緒核心執行邏輯)
前面介紹了用戶如何向執行緒池提交任務,以及如何創建新工作執行緒Worker,下面介紹工作執行緒在執行緒池中是如何運行的。
- runWorker方法內部本質上是一個無限循環,在進入主循環之前通過unlock方法,將內部AQS父類中的state標識為0,允許被外部中斷(可以被interruptIfStarted選中而打斷)
- 之後便是主循環,如果firstTask不為空(說明第一次啟動),則直接調用task.run方法。否則通過getTask方法嘗試從工作隊列中撈取一個任務來執行
- 在實際的任務執行前和執行後都調用對應的鉤子方法(beforeExecute、afterExecute)
- 在任務執行前通過lock方法將AQS的state方法設置為1代表當前Worker正在執行任務,並在執行完一個任務後在finally中進行unlock解鎖,令當前工作執行緒進入idle狀態。
同時清空firstTask的值(清空後下一次循環就會通過getTask獲取任務了)並令Worker中的completedTasks統計指標也自增1 - 如果任務執行過程中出現了異常,則catch住並最終向上拋出跳出主循環,finally中執行processWorkerExit(認為任務一旦執行出現了異常,則很可能工作執行緒內部的一些狀態已經損壞,需要重新創建一個新的工作執行緒來代替出異常的老工作執行緒)
- 有兩種情況會導致執行processWorkerExit,一種是上面說的任務執行時出現了異常,此時completedAbruptly=true;還有一種可能時getTask因為一些原因返回了null,此時completedAbruptly=false。
completedAbruptly會作為processWorkerExit的參數傳遞。
/**
* worker工作執行緒主循環執行邏輯
* */
private void runWorker(MyWorker myWorker) {
// 時worker執行緒的run方法調用的,此時的current執行緒的是worker執行緒
Thread workerThread = Thread.currentThread();
Runnable task = myWorker.firstTask;
// 已經暫存了firstTask,將其清空(有地方根據firstTask是否存在來判斷工作執行緒中負責的任務是否是新提交的)
myWorker.firstTask = null;
// 將state由初始化時的-1設置為0
// 標識著此時當前工作執行緒開始工作了,這樣可以被interruptIfStarted選中
myWorker.unlock();
// 默認執行緒是由於中斷退出的
boolean completedAbruptly = true;
try {
// worker執行緒處理主循環,核心邏輯
while (task != null || (task = getTask()) != null) {
// 將state由0標識為1,代表著其由idle狀態變成了正在工作的狀態
// 這樣interruptIdleWorkers中的tryLock會失敗,這樣工作狀態的執行緒就不會被該方法中斷任務的正常執行
myWorker.lock();
// v1版本此處省略優雅停止相關的核心邏輯
try {
// 任務執行前的鉤子函數
beforeExecute(workerThread, task);
Throwable thrown = null;
try {
// 拿到的任務開始執行
task.run();
} catch (RuntimeException | Error x) {
// 使用thrown收集拋出的異常,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主循環
throw x;
} catch (Throwable x) {
// 使用thrown收集拋出的異常,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主循環
throw new Error(x);
} finally {
// 任務執行後的鉤子函數,如果任務執行時拋出了錯誤/異常,thrown不為null
afterExecute(task, thrown);
}
} finally {
// 將task設置為null,令下一次while循環通過getTask獲得新任務
task = null;
// 無論執行時是否存在異常,已完成的任務數加1
myWorker.completedTasks++;
// 無論如何將myWorker解鎖,標識為idle狀態
myWorker.unlock();
}
}
// getTask返回了null,說明沒有可執行的任務或者因為idle超時、執行緒數超過配置等原因需要回收當前執行緒。
// 執行緒正常的退出,completedAbruptly為false
completedAbruptly = false;
}finally {
// getTask返回null,執行緒正常的退出,completedAbruptly值為false
// task.run()執行時拋出了異常/錯誤,直接跳出了主循環,此時completedAbruptly為初始化時的默認值true
processWorkerExit(myWorker, completedAbruptly);
// processWorkerExit執行完成後,worker執行緒對應的run方法(run->runWorker)也會執行完畢
// 此時執行緒對象會進入終止態,等待作業系統回收
// 而且processWorkerExit方法內將傳入的Worker從workers集合中移除,jvm中的對象也會因為不再被引用而被GC回收
// 此時,當前工作執行緒所佔用的所有資源都已釋放完畢
}
}
getTask嘗試獲取任務執行
runWorker中是通過getTask獲取任務的,getTask中包含著工作執行緒是如何從工作隊列中獲取任務的關鍵邏輯。
- 在獲取任務前,需要通過getTask檢查當前執行緒池的執行緒數量是否超過了參數配置(啟動後被動態調整了),因此需要先獲得當前執行緒池工作執行緒總數workCount。
如果當前工作執行緒數量超過了指定的最大執行緒個數maximumPoolSize限制,則說明當前執行緒需要退出了 - timed標識用於決定當前執行緒如何從工作隊列(阻塞隊列)中獲取新任務,如果timed為true則通過poll方法獲取同時指定相應的超時時間(配置參數keepAliveTime),如果timed為false則通過take方法無限期的等待。
如果工作隊列並不為空,則poll和take方法都會立即返回一個任務對象。而當工作隊列為空時,工作執行緒則會阻塞在工作隊列上以讓出CPU(idle狀態)直到有新的任務到來而被喚醒(或者超時喚醒)。
這也是存儲任務的workQueue不能是普通的隊列,而必須是阻塞隊列的原因。(對阻塞隊列工作原理不太清楚的讀者可以參考我以前的部落格:自己動手實現一個阻塞隊列) - timed的值由兩方面共同決定。一是配置參數allowCoreThreadTimeOut是否為true,為true的話說明不管是核心執行緒還是非核心執行緒都需要在idle等待keepAliveTime後銷毀退出。所以allowCoreThreadTimeOut=true,則timed一定為true
二是如果allowCoreThreadTimeOut為false,說明核心執行緒不需要退出,而非核心執行緒在idle等待keepAliveTime後需要銷毀退出。則判斷當前workCount是否大於配置的corePoolSize,是的話則timed為true否則為false。
如果當前執行緒數超過了指定的最大核心執行緒數corePoolSize,則需要讓工作隊列為空時(說明執行緒池負載較低)部分idle執行緒退出,使得最終活躍的執行緒數減少到和corePoolSize一致。
從這裡可以看到,核心與非核心執行緒的概念在ThreadPoolExecutor里是很弱的,不關心工作執行緒最初是以什麼原因創建的都一視同仁,誰都可能被當作非核心執行緒而銷毀退出。 - timedOut標識當前工作執行緒是否因為poll拉取任務時出現了超時。take永遠不會返回null,因此只有poll在超時時會返回null,當poll返回值為null時,表明是等待了keepAliveTime時間後超時了,所以timedOut標識為true。
同時如果拉取任務時執行緒被中斷了,則捕獲InterruptedException異常,將timeOut標識為false(被中斷的就不認為是超時)。 - 當(workCount > maximumPoolSize)或者 (timed && timedOut)兩者滿足一個時,就說明當前執行緒應該要退出了。
此時將當前的workCount用cas的方式減去1,返回null代表獲取任務失敗即可;如果cas失敗,則在for循環中重試。
但有一種情況是例外的(workCount <= 1 && !workQueue.isEmpty()),即當前工作執行緒數量恰好為1,且工作隊列不為空(那麼還需要當前執行緒繼續工作把工作隊列里的任務都消費掉,無論如何不能退出)
/**
* 嘗試著從阻塞隊列里獲得待執行的任務
* @return 返回null代表工作隊列為空,沒有需要執行的任務; 或者當前worker執行緒滿足了需要退出的一些條件
* 返回對應的任務
* */
private Runnable getTask() {
boolean timedOut = false;
for(;;) {
int currentCtl = ctl.get();
// 獲得當前工作執行緒個數
int workCount = workerCountOf(currentCtl);
// 有兩種情況需要指定超時時間的方式從阻塞隊列workQueue中獲取任務(即timed為true)
// 1.執行緒池配置參數allowCoreThreadTimeOut為true,即允許核心執行緒在idle一定時間後被銷毀
// 所以allowCoreThreadTimeOut為true時,需要令timed為true,這樣可以讓核心執行緒也在一定時間內獲取不到任務(idle狀態)而被銷毀
// 2.執行緒池配置參數allowCoreThreadTimeOut為false,但當前執行緒池中的執行緒數量workCount大於了指定的核心執行緒數量corePoolSize
// 說明當前有一些非核心的執行緒正在工作,而非核心的執行緒在idle狀態一段時間後需要被銷毀
// 所以此時也令timed為true,讓這些執行緒在keepAliveTime時間內由於隊列為空拉取不到任務而返回null,將其銷毀
boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
// 有共四種情況不需要往下執行,代表
// 1 (workCount > maximumPoolSize && workCount > 1)
// 當前工作執行緒個數大於了指定的maximumPoolSize(可能是由於啟動後通過setMaximumPoolSize調小了maximumPoolSize的值)
// 已經不符合執行緒池的配置參數約束了,要將多餘的工作執行緒回收掉
// 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作隊列里的任務,直接返回null表示自己需要被回收
// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
// 當前工作執行緒個數大於了指定的maximumPoolSize(maximumPoolSize被設置為0了)
// 已經不符合執行緒池的配置參數約束了,要將多餘的工作執行緒回收掉
// 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作隊列里剩餘的任務了
// 所以即使maximumPoolSize設置為0,也需要等待任務被處理完,工作隊列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務
// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次循環後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作隊列里的任務,直接返回null表示自己需要被回收
// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次循環後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作隊列里剩餘的任務了
// 所以即使timed && timedOut超時邏輯匹配,也需要等待任務被處理完,工作隊列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務
if ((workCount > maximumPoolSize || (timed && timedOut))
&& (workCount > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(currentCtl)) {
// 滿足上述條件,說明當前執行緒需要被銷毀了,返回null
return null;
}
// compareAndDecrementWorkerCount方法由於並發的原因cas執行失敗,continue循環重試
continue;
}
try {
// 根據上面的邏輯的timed標識,決定以什麼方式從阻塞隊列中獲取任務
Runnable r = timed ?
// timed為true,通過poll方法指定獲取任務的超時時間(如果指定時間內沒有隊列依然為空,則返回)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed為false,通過take方法無限期的等待阻塞隊列中加入新的任務
workQueue.take();
if (r != null) {
// 獲得了新的任務,getWork正常返回對應的任務對象
return r;
}else{
// 否則說明timed=true,且poll拉取任務時超時了
timedOut = true;
}
} catch (InterruptedException retry) {
// poll or take任務等待時worker執行緒被中斷了,捕獲中斷異常
// timeout = false,標識拉取任務時沒有超時
timedOut = false;
}
}
}
processWorkerExit(處理工作執行緒退出)
在runWorker中,如果getTask方法沒有拿到任務返回了null或者任務在執行時拋出了異常就會在最終的finally塊中調用processWorkerExit方法,令當前工作執行緒銷毀退出。
- processWorkerExit方法內會將當前執行緒佔用的一些資源做清理,比如從workers中移除掉當前執行緒(利於Worker對象的GC),並令當前執行緒workerCount減一(completedAbruptly=true,說明是中斷導致的退出,getTask中沒來得及減workerCount,在這裡補正)
- completedAbruptly=true,說明是runWorker中任務異常導致的執行緒退出,無條件的通過addWorker重新創建一個新的工作執行緒代替當前退出的工作執行緒。
- completedAbruptly=false,在退出當前工作執行緒後,需要判斷一下退出後當前所存活的工作執行緒數量是否滿足要求。
比如allowCoreThreadTimeOut=false時,當前工作執行緒個數是否不低於corePoolSize等,如果不滿足要求則通過addWorker重新創建一個新的執行緒。
工作執行緒退出時所佔用資源的回收
- processWorkerExit方法執行完畢後,當前工作執行緒就完整的從當前執行緒池中退出了(workers中沒有了引用,workerCount減1了),GC便會將記憶體中的Worker對象所佔用的記憶體給回收掉。
- 同時runWorker中最後執行完processWorkerExit後,工作執行緒的run方法也return了,標識著整個執行緒正常退出了,作業系統層面上也會將執行緒轉為終止態並最終回收。至此,執行緒佔用的所有資源就被徹底的回收乾淨了。
/**
* 處理worker執行緒退出
* @param myWorker 需要退出的工作執行緒對象
* @param completedAbruptly 是否是因為中斷異常的原因,而需要回收
* */
private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
if (completedAbruptly) {
// 如果completedAbruptly=true,說明是任務在run方法執行時出錯導致的執行緒退出
// 而正常退出時completedAbruptly=false,在getTask中已經將workerCount的值減少了
decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 執行緒池全局總完成任務數累加上要退出的工作執行緒已完成的任務數
this.completedTaskCount += myWorker.completedTasks;
// workers集合中將當前工作執行緒剔除
workers.remove(myWorker);
// completedTaskCount是long類型的,workers是HashSet,
// 都是非執行緒安全的,所以在mainLock的保護進行修改
} finally {
mainLock.unlock();
}
int currentCtl = this.ctl.get();
if (!completedAbruptly) {
// completedAbruptly=false,說明不是因為中斷異常而退出的
// min標識當前執行緒池允許的最小執行緒數量
// 1 如果allowCoreThreadTimeOut為true,則核心執行緒也可以被銷毀,min=0
// 2 如果allowCoreThreadTimeOut為false,則min應該為所允許的核心執行緒個數,min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
// 如果min為0了,但工作隊列不為空,則修正min=1,因為至少需要一個工作執行緒來將工作隊列中的任務消費、處理掉
min = 1;
}
if (workerCountOf(currentCtl) >= min) {
// 如果當前工作執行緒數大於了min,當前執行緒數量是足夠的,直接返回(否則要執行下面的addWorker恢復)
return;
}
}
// 兩種場景會走到這裡進行addWorker操作
// 1 completedAbruptly=true,說明執行緒是因為中斷異常而退出的,需要重新創建一個新的工作執行緒
// 2 completedAbruptly=false,且上面的workerCount<min,則說明當前工作執行緒數不夠,需要創建一個
// 為什麼參數core傳的是false呢?
// 因為completedAbruptly=true而中斷退出的執行緒,無論當前工作執行緒數是否大於核心執行緒,都需要創建一個新的執行緒來代替原有的被退出的執行緒
addWorker(null, false);
}
動態修改配置參數
ThreadPoolExecutor除了支援啟動前通過構造函數設置配置參數外,也允許在執行緒池運行的過程中動態的更改配置。而要實現動態的修改配置,麻煩程度要比啟動前靜態的指定大得多。
舉個例子,在執行緒池的運行過程中如果當前corePoolSize=20,且已經創建了20個核心執行緒時(workerCount=20),現在將corePoolSize減少為10或者增大為30時應該如何實時的生效呢?
下面通過內嵌於程式碼中的注釋,詳細的說明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize這三個關鍵配置參數實現動態修改的原理。
/**
* 設置是否允許核心執行緒idle超時後退出
* */
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 判斷一下新舊值是否相等,避免無意義的volatile變數更新,導致不必要的cpu cache同步
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
// 參數值value為true,說明之前不允許核心執行緒由於idle超時而退出
// 而此時更新為true說明現在允許了,則通過interruptIdleWorkers喚醒所有的idle執行緒
// 令其走一遍runWorker中的邏輯,嘗試著讓idle超時的核心執行緒及時銷毀
interruptIdleWorkers();
}
}
}
/**
* 動態更新核心執行緒最大值corePoolSize
* */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException();
}
// 計算差異
int delta = corePoolSize - this.corePoolSize;
// 賦值
this.corePoolSize = corePoolSize;
if (workerCountOf(this.ctl.get()) > corePoolSize) {
// 更新完畢後,發現當前工作執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閑的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷毀
interruptIdleWorkers();
} else if (delta > 0) {
// 差異大於0,代表著新值大於舊值
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 我們無法確切的知道有多少新的執行緒是所需要的。
// 啟發式的預先啟動足夠的新工作執行緒用於處理工作隊列中的任務
// 但當執行此操作時工作隊列為空了,則立即停止此操作(隊列為空了說明當前負載較低,再創建更多的工作執行緒是浪費資源)
// 取差異和當前工作隊列中的最小值為k
int k = Math.min(delta, workQueue.size());
// 嘗試著一直增加新的工作執行緒,直到和k相同
// 這樣設計的目的在於控制增加的核心執行緒數量,不要一下子創建過多核心執行緒
// 舉個例子:原來的corePoolSize是10,且工作執行緒數也是10,現在新值設置為了30,新值比舊值大20,理論上應該直接創建20個核心工作執行緒
// 而工作隊列中的任務數只有10,那麼這個時候直接創建20個新工作執行緒是沒必要的,只需要一個一個創建,在創建的過程中新的執行緒會盡量的消費工作隊列中的任務
// 這樣就可以以一種啟發性的方式創建合適的新工作執行緒,一定程度上節約資源。後面再有新的任務提交時,再從runWorker方法中去單獨創建核心執行緒(類似惰性創建)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) {
// 其它工作執行緒在循環的過程中也在消費工作執行緒,且用戶也可能不斷地提交任務
// 這是一個動態的過程,但一旦發現當前工作隊列為空則立即結束
break;
}
}
}
}
/**
* 動態更新最大執行緒數maximumPoolSize
* */
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
// 更新完畢後,發現當前工作執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閑的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷毀
interruptIdleWorkers();
}
}
目前為止,通過v1版本的MyThreadPoolExecutor源碼,已經將jdk執行緒池ThreadPoolExecutor在RUNNING狀態下提交任務,啟動工作執行緒執行任務相關的核心邏輯講解完畢了(不考慮優雅停止)。
jdk執行緒池默認支援的四種拒絕策略
jdk執行緒池支援用戶傳入自定義的拒絕策略處理器,只需要傳入實現了RejectedExecutionHandler介面的對象就行。
而jdk在ThreadPoolExecutor中提供了默認的四種拒絕策略方便用戶使用。
- AbortPolicy
拒絕接受任務時會拋出RejectedExecutionException,能讓提交任務的一方感知到異常的策略。適用於大多數場景,也是jdk默認的拒絕策略。 - DiscardPolicy
直接丟棄任務的拒絕策略。簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合 - DiscardOldestPolicy
丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交。適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性) - CallerRunsPolicy
令調用者執行緒自己執行所提交任務的拒絕策略。在執行緒池壓力過大時,讓提交任務的執行緒自己執行該任務(非同步變同步),能有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低。
上面介紹的四種jdk默認拒絕策略分別適應不同的業務場景,需要用戶仔細考慮最適合的拒絕策略。同時靈活的、基於介面的設計也開放的支援用戶去自己實現更貼合自己業務的拒絕策略處理器。
/**
* 默認的拒絕策略:AbortPolicy
* */
private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
/**
* 拋出RejectedExecutionException的拒絕策略
* 評價:能讓提交任務的一方感知到異常的策略,比較通用,也是jdk默認的拒絕策略
* */
public static class MyAbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 直接拋出異常
throw new RejectedExecutionException("Task " + command.toString() +
" rejected from " + executor.toString());
}
}
/**
* 令調用者執行緒自己執行command任務的拒絕策略
* 評價:在執行緒池壓力過大時,讓提交任務的執行緒自己執行該任務(非同步變同步),
* 能夠有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低
* */
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則令調用者執行緒自己執行command任務
command.run();
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
/**
* 直接丟棄任務的拒絕策略
* 評價:簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合
* */
public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 什麼也不做的,直接返回
// 效果就是command任務被無聲無息的丟棄了,沒有異常
}
}
/**
* 丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交
* 評價:適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性)
* */
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則丟棄當前工作隊列中最早入隊的任務,然後將當前任務重新提交
executor.getQueue().poll();
executor.execute(command);
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
jdk默認的四種執行緒池實現
jdk中除了提供了默認的拒絕策略,還在Executors類中提供了四種基於ThreadPoolExecutor的、比較常用的執行緒池,以簡化用戶對執行緒池的使用。
這四種執行緒池可以通過Executors提供的public方法來分別創建:
newFixedThreadPool
newFixedThreadPool方法創建一個工作執行緒數量固定的執行緒池,其創建ThreadPoolExecutor時傳入的核心執行緒數corePoolSize和最大執行緒數maximumPoolSize是相等的。
因此其工作隊列傳入是一個無界的LinkedBlockingQueue,無界的工作隊列意味著永遠都不會創建新的非核心執行緒。
在默認allowCoreThreadTimeOut為false的情況下,執行緒池中的所有執行緒都是不會因為idle超時而銷毀的核心執行緒。
適用場景:由於工作執行緒數量固定,「fixedThreadPool」適用於任務流量較為穩定的場景
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool
newCachedThreadPool方法創建一個工作執行緒數量有巨大彈性的執行緒池,其核心執行緒數corePoolSize=0而最大執行緒數maximumPoolSize為Integer.MAX_VALUE,60s的保活時間。
同時其工作隊列是SynchronousQueue,是一種隊列容量為0、無法快取任何任務的阻塞隊列(任何時候插入數據(offer)時必須有消費者執行緒消費,否則生產者執行緒將會被阻塞)。
這也意味著「cachedThreadPool」中沒有核心執行緒,所有工作執行緒在任務負載較低時都會在60s的idle後被銷毀;同時當負載較高,新任務到來時由於所有的工作執行緒都在執行其它任務,將會立即創建一個新的非核心執行緒來處理任務。
適用場景:由於可以無限制的創建新執行緒來做到及時響應任務,「cachedThreadPool」適用於任務流量較大且不穩定,對任務延遲容忍度較低的場景
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
newSingleThreadExecutor方法創建一個單執行緒的執行緒池,其核心執行緒數corePoolSize=1且最大執行緒數maximumPoolSize也為1,其工作隊列是無界隊列。
這意味著「singleThreadExecutor」中任何提交的任務都將嚴格按照先入先出的順序被執行。
適用場景:「singleThreadExecutor」適用於任務量較小、對任務延遲容忍度較高、並要求任務順序執行的場景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
newScheduledThreadPool方法創建一個支援定時任務、延遲任務執行的執行緒池(關於jdk定時任務執行緒池ScheduledThreadPoolExecutor的工作原理會在未來的部落格中展開)
適用場景:「scheduledThreadPool」適用於需要任務定時或者延遲執行的場景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
jdk默認提供的執行緒池的缺陷
- 無論是newCachedThreadPool還是newFixedThreadPool、newSingleThreadExecutor,其設置的最大執行緒數量(Integer.MAX_VALUE)和無界的工作隊列(new LinkedBlockingQueue
())都缺乏必要的限制。
在生產環境中很容易因為任務流量過大導致創建過多的工作執行緒或令無界的工作隊列堆積大量的任務對象而耗盡CPU和記憶體等系統資源,最終導致程式崩潰。
這也是為什麼阿里巴巴的開發規範中推薦使用更基礎的ThreadPoolExecutor構造函數來創建所需要的執行緒池。 - 只有在了解ThreadPoolExecutor工作原理以及各項配置參數的具體作用後,才能根據具體的業務和硬體配置來設置最合適的參數值。
總結
- 這篇部落格中我們首先介紹了執行緒池的基本概念,隨後在源碼層面解析了jdk默認的執行緒池ThreadPoolExecutor在執行所提交任務的整體工作原理(RUNNING狀態),
並在最後簡單的分析了jdk默認提供的四種拒絕策略和四種執行緒池的適用場景。 - 希望通過這篇部落格能讓讀者更好的理解執行緒池的工作原理,並在工作中更好的使用執行緒池。關於ThreadPoolExecutor優雅停止的原理會在下一篇部落格中進行詳細的分析,盡請期待。
- 本篇部落格的完整程式碼在我的github上://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模組) 內容如有錯誤,還請多多指教。