執行緒池:ThreadPoolExecutor源碼解讀
- 2022 年 9 月 3 日
- 筆記
1 帶著問題去閱讀
1.1 執行緒池的執行緒復用原理
用戶每次調用execute()來提交一個任務,然後任務包裝成Worker對象,並且啟動一個worker執行緒來執行任務(任務可能會被先加入隊列),只要任務隊列不為空且worker執行緒沒有被中斷,執行緒的run()方法通過一個while循環,不斷去隊列獲取任務並執行,而不會進入到run()方法底部。while循環是執行緒復用的關鍵
1.2 執行緒池如何管理執行緒
首先定義兩個說明:
-
關於獲取任務超時,會依賴以下條件:
–1、開啟核心執行緒超時設置 或 執行緒池執行緒數大於核心執行緒數
–2、符合1,且從workqueue獲取任務超時。(如果不符合1,則以阻塞方式獲取任務,不會超時) -
執行緒池最小保留執行緒數:
–1、如果沒有開啟核心執行緒超時配置,則至少保留corePoolSize個執行緒
–2、如果開啟核心執行緒超時並且當前隊列裡面還有任務,只需保留1個執行緒
將執行緒池的生命周期分為三個階段:創建階段、運行期間、終止階段。
一、創建階段
- 當執行緒池執行緒數(ctl低位)少於核心執行緒數(corePoolSize),創建新執行緒執行任務
- 當執行緒池執行緒數大於等於核心執行緒數,且任務隊列未滿時,將新任務放入到任務隊列中,不創建執行緒
- 當執行緒池執行緒數大於等於核心執行緒數(maximumPoolSize),且任務隊列已滿
–如果工作執行緒數少於最大執行緒數,則創建新執行緒執行任務
–如果工作執行緒數等於最大執行緒數,則拋出異常,拒絕新任務進入
二、運行期間
1、執行緒啟動後,將一直循環獲取任務並執行,只有當獲取任務超時,或者執行緒池被終止,才會結束。
2、如果獲取任務超時,那麼Worker執行緒自然結束。此時執行緒池減少了1個執行緒。
3、在執行緒結束後,執行緒池會檢查:1、執行緒池執行緒數<最少保留執行緒數 2、任務執行異常結束。如果符合,執行緒池會自動補充1個Worker
三、終止階段
調用shutdown()和shutdownNow()都導致執行緒池執行緒數減少。
1、shutdown()方式終止執行緒池:
–停止提交新的任務,已在隊列的任務會繼續執行,並且中斷空閑的Worker執行緒(Work.state從0->1成功),執行緒池狀態變為SHUTDOWN
2、shutdownNow()方式終止執行緒池:
–關閉執行緒池,不再接受新的任務,中斷已經啟動的Worker執行緒(Work.state>0),執行緒池狀態改為STOP
執行緒池創建執行緒及處理任務過程:
梳理一下大概流程:
- 用戶執行緒調用execute()提交Runnable任務
- execute()調用addWork()將任務提交給執行緒池處理:如果有可用的核心執行緒,則提交給核心執行緒處理。反則,將任務先添加到任務隊列(workQueen)中。
- addWorker()方法將啟動一個worker執行緒,調用runWorker()來處理任務。
- runWorker()方法將循環獲取任務,並運行任務的run()方法來執行真正的業務。如果是以核心執行緒提交任務,則優先處理該任務,否則,循環調用getTask()來獲取任務
- getTask()方法,從任務隊列(workQueen)取出任務,並返回。
- getTask()沒有拿到任務,則執行執行緒結束processWorkerExit()
執行緒池創建階段:
1.3 執行緒池配置的重要參數
- ctl:存儲執行緒池狀態以及執行緒數
- corePoolSize、maximumPoolSize、keepAliveTime、workQueue 參照下面的源碼分析說明
- allowCoreThreadTimeOut:是否開啟核心執行緒超時。默認false,不在構造函數設置,需要調用方法設置
- HashSet
workers:執行緒池終止時會從該集合找執行緒來中斷,源碼分析有說明
1.4 shutdown()和shutdownNow()區別
- shutdown() :關閉執行緒池,不再接受新的任務,已提交執行的任務繼續執行;中斷所有空閑執行緒;將執行緒池狀態改為SHUTDOWN
- ShutDownNow():關閉執行緒池,不再接受新的任務,中斷已經啟動的Worker執行緒;將執行緒池狀態改為STOP;返回未完成的任務隊列
1.5 執行緒池中的兩個鎖
- mainLock主鎖是可重入的鎖,用來同步更新的成員變數
- Worker內部實現了一個鎖,它是不可重入的,在shutdown()場景中,通過tryLock確保不會中斷還沒有開始執行或者還在執行中的worker執行緒。
2 源碼分析過程中的困惑及解惑
—什麼情況任務會提交失敗?
同時符合以下條件,任務才會被提交:
- 執行緒池狀態等於RUNNING狀態;
- 如果任務隊列已經滿了,並且執行緒池執行緒數 少於 配置的執行緒池最大執行緒數(maximumPoolSize) 且小於執行緒池的最大支援執行緒數(CAPACITY)時。(如果隊列沒滿,任務將會先加入到隊列中)
特別說明:特殊情況會創建任務為空的Worker執行緒來幫助隊列中的任務跑完
—核心執行緒數的意義?從測試結果看,他決定了工作執行緒最大並發數,但未程式碼驗證
- 核心執行緒數決定提交任務什麼時候會被放入到隊列中:即執行緒池執行緒數>=核心執行緒數時。
- 核心執行緒數大小跟並發執行執行緒(任務)無關。也就是,它不決定工作執行緒最大並發數
- 核心執行緒數可以動態修改。(如果增大了,可能會馬上創建新的Worker執行緒)
—執行緒池狀態不是RUNNING,或者往workQueue添加worker失敗,這是為什麼還要提交任務
以下情況會創建任務為空的Worker執行緒來執行隊列中的任務
- 當前執行緒池狀態為shutdown,但是任務隊列不為空,這時創建Worker執行緒來幫助執行隊列的任務
- 當前執行緒池狀態為running, 任務添加到隊列後,接著執行緒池被關閉,並且從隊列移除該任務失敗,並且執行緒池執行緒數為0,這時創建Worker執行緒來確保剛提交的任務有機會執行。
—為什麼runWorker()方法在執行任務前後加鎖,但是執行緒依然能夠並發?
- worker執行緒是通過創建Worker對象來創建的,在addWorke()的while循環創建了多個Worker對象,每個Worker對象都有自己的鎖,Worker執行緒通過runWorker()訪問的是當前對象的鎖,因此Worker執行緒能夠並發;
- 鎖的意義是限制不能中斷執行中的任務,因為主執行緒調用shutdown()和shutdownNow()方法時,會遍歷WorkerSet的Worker對象,調用tryLock(),這時主執行緒和Worker執行緒競爭同一個鎖。
3 源碼分析
3.1 類繼承關係
- Executo介面:專門提交任務,只有一個execute()方法。Executor 提供了一種將任務的提交和任務的執行兩個操作進行解耦的思路:客戶端無需關注執行任務的執行緒是如何創建、運行和回收的,只需要將任務的執行邏輯包裝為一個 Runnable 對象傳遞進來即可,由 Executor 的實現類自己來完成最複雜的執行邏輯
- ExecutorService介面:繼承了Executor,擴展執行任務的能力。例如:獲取任務的執行結果、取消任務等功能;提供了關閉執行緒池、停止執行緒池,以及阻塞等待執行緒池完全終止的方法,需要ThreadPoolExecutor實現
- AbstractExecutorServic類:實現了 ExecutorService ,是上層的抽象類,負責將任務的執行流程串聯起來,從而使得下層的實現類 ThreadPoolExecutor只需要實現一個執行任務的方法即可
- ThreadPoolExecutor:可以看做是基於生產者-消費者模式的一種服務,內部維護的多個執行緒相當於消費者,提交的任務相當於產品,提交任務的外部就相當於生產者
3.2 類的常量/成員變數
//--------------------------常量部分------------------------
// 常量29。用在移位計算Integer.SIZE=32)
private static final int COUNT_BITS = Integer.SIZE - 3; //29
// 最大支援執行緒數 2^29-1:000 11111111111111111...
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 以下為執行緒池的四個狀態,用32位中的前三位表示
// 011 terminated() 方法執行完成後,執行緒池的狀態會轉為TERMINATED.
private static final int TERMINATED = 3 << COUNT_BITS;
// 010 所有任務都銷毀了,workCount=0的時候,執行緒池的裝填在轉換為TIDYING是,會執行鉤子方法terminated()
private static final int TIDYING = 2 << COUNT_BITS; //翻譯為整理
// 001 拒絕新的任務提交,清空在隊列中的任務
private static final int STOP = 1 << COUNT_BITS;
// 000 拒絕新的任務提交,會將隊列中的任務執行完,正在執行的任務繼續執行.
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 111 00000 00000000 00000000 00000000 執行緒運行中 【running狀態值為負數最小】
private static final int RUNNING = -1 << COUNT_BITS; //執行緒池的默認狀態
//------------------------變數部分------------------------
// ctl存儲執行緒池狀態和執行緒池大小,那麼用前3位表示執行緒池狀態,後29位表示:執行緒池大小,即執行緒池執行緒數
//執行緒池狀態初始值為RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//任務隊列
//保存不能馬上執行的Runnable任務。
//執行shutdownNow()時,會返回還在隊列的任務
private final BlockingQueue<Runnable> workQueue;
// 主鎖,對workers、largestPoolSize、completedTaskCount的訪問都必須先獲取該鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 包含池中的所有工作執行緒的集合。持有mainLock訪問
// 創建Worker時,添加到集合
// 執行緒結束時,從集合移除
// 調用shutdown()時,從該集合中找到空閑執行緒並中斷
// 調用shutdownNow()時,從該集合中找到已啟動的執行緒並中斷
private final HashSet<Worker> workers = new HashSet<Worker>();
// 執行緒通訊手段, 用於支援awaitTermination方法:等待所有任務完成,並支援設置超時時間,返回值代表是不是超時.
private final Condition termination = mainLock.newCondition();
// 記錄workers歷史以來的最大值。持有mainLock訪問
// 每次增加worker的時候,都會判斷當前workers.size()是否大於最大值,大於則更新
// 用於執行緒池監控的,作為重要指標
private int largestPoolSize;
// 計數所有已完成任務,持有mainLock訪問
// 每個worker都有一個自己的成員變數 completedTasks 來記錄當前 worker 執行的任務次數, 當前線worker工作執行緒終止的時候, 才會將worker中的completedTasks的數量加入到 completedTaskCount 指標中.
private long completedTaskCount;
// 執行緒工廠
private volatile ThreadFactory threadFactory;
// 拒絕策略,默認四種AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,建議自己實現,增加監控指標
private volatile RejectedExecutionHandler handler;
// keepAliveTime和allowCoreThreadTimeOut 是關於執行緒空閑是否會被銷毀的配置
// 關於空閑的說明:
// 1、執行緒池在沒有關閉之前,會一直向任務隊列(workqueue)獲取任務執行,如果任務隊列是空的,在新任務提交上來之前,就會產生一個等待時間,期間,執行緒處於空閑狀態
// 2、向任務隊列獲取任務用:workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),表示阻塞式獲取元素,等待超時,則終止等待並返回false。通過判斷poll()方法是true/falle來判定執行緒是否超時
// 獲取任務的等待時間 ,以下兩種情況會使用到該值
//1、如果啟用allowCoreThreadTimeOut,那表示核心執行緒的空閑時間
// 2、當執行緒池內執行緒數超過corePoolSize,表示執行緒獲取任務的等待時間
private volatile long keepAliveTime;
// 核心執行緒是否開啟超時
// false:表示核心執行緒一旦啟動,會一直運行,直至關閉執行緒池。默認該值
// true:表示核心執行緒處於空閑且時間超過keepAliveTime,核心執行緒結束後,將不再創建新執行緒
// (默認的構造函數沒有設置這個屬性,需要手工調用allowCoreThreadTimeOut()方法來設置)
private volatile boolean allowCoreThreadTimeOut;
//核心執行緒數量
//核心執行緒是指:執行緒會一直存活在執行緒池中,不會被主動銷毀【如果核心執行緒開啟超時,有可能被被銷毀】。
private volatile int corePoolSize;
// 配置的執行緒池最大執行緒數
private volatile int maximumPoolSize;
// 默認拒絕策略 AbortPolicy
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// 安全控制訪問(主要用於shutdown和 shutdownNow方法
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
// 在threadPoolExecutor初始化的時候賦值,acc對象是指當前調用上下文的快照,其中包括當前執行緒繼承的AccessControlContext和任何有限的特權範圍,使得可以在稍後的某個時間點(可能在另一個執行緒中)檢查此上下文。
private final AccessControlContext acc;
3.3 成員變數訪問方法
// 獲取當前執行緒池的狀態(前3位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取當前執行緒池中執行緒數(後29位)
private static int workerCountOf(int c){ return c & CAPACITY; }
// 更新狀態和數量
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 小於判斷C是不是小於S,比如runStateLessThan(var,STOP),那var就只有可能是(RUNNING,SHUTDOWN)
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 是不是C >= S
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷狀態是不是RUNNING
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
關於-1<<29說明:
-1 << COUNT_BITS
這裡是-1往左移29位,稍微有點不一樣,-1的話需要我們自己算出補碼來
-1的原碼
10000000 00000000 00000000 00000001
-1的反碼,負數的反碼是將原碼除符號位以外全部取反
11111111 11111111 11111111 11111110
-1的補碼,負數的補碼就是將反碼+1
11111111 11111111 11111111 11111111
關鍵了,往左移29位,所以高3位全是1就是RUNNING狀態
111 00000 00000000 00000000 00000000
3.4 構造函數
//corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue 這五個參數必須指定
//最多參構造函數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//初始值的合法性校驗
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || //最大執行緒數必須大於核心執行緒數
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//成員變數賦初值
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;//默認使用SynchronousQueue<Runnable>
this.keepAliveTime = unit.toNanos(keepAliveTime); //默認60S
this.threadFactory = threadFactory; //默認使用DefaultThreadFactory
this.handler = handler;
}
構造函數總結:
初始化:corePoolSize(核心執行緒池大小)、maximumPoolSize(執行緒池容納最大執行緒數)、workQueue(任務隊列)、threadFactory(執行緒工廠)、keepAliveTime(空閑執行緒存活時長)、handler(拒絕策略)AccessControlContext
3.5 靜態內部類Worker
3.5.1 Worker繼承關係
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
}
- –Worker繼承於AbstractQueuedSynchronizer
Worker繼承於AQS 為的就是自定義實現不可重入的特性(所以沒有使用 synchronized 或者 ReentrantLock)來輔助判斷執行緒是否處於執行任務的狀態:在開始執行任務前進行加鎖,在任務執行結束後解鎖,以便在後續通過判斷 Worker 是否處於鎖定狀態來得知其是否處於執行階段
- — Worker實現Runnable介面
Worker實現Runnable介面,執行緒是通過getThreadFactory().newThread(this) 來創建的,即將 Worker 本身作為構造參數傳給 Thread 進行初始化,所以在 thread 啟動的時候 Worker 的 run() 方法就會被執行。
關於ThreadFactory說明:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
3.5.2 Worker源碼分析
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//執行緒類型的屬性:thread,執行緒池啟動工作執行緒,就是啟動這個thread。
// 1、通過this.thread=getThreadFactory().newThread(this),初始化了屬性thread,this就是指Worker對象
//2、因為Worker類實現了Runnable介面,所以thread啟動後,會運行Worker的run()方法,然後就去執行runWorker(this)方法
final Thread thread;
//執行緒要執行的第1個任務(可能為 null) 它表示這個任務立即執行,不需要放到任務隊列。在工作執行緒數<核心執行緒數時,這種場景會出現
Runnable firstTask;
//保存Worker執行緒池執行過的任務數,在runWorker()的finally中累加更新。任務執行成功與否都會更新
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // AQS父類的state。設為-1
this.firstTask = firstTask; //firstTask賦初值
this.thread = getThreadFactory().newThread(this); //屬性thread賦值
}
//Runnable run方法實現
public void run() {
runWorker(this); //調用runWorkder方法:將Worker對象傳遞給調用者,這樣就可以訪問firstTask、thread等屬性以及lock()相關方法
}
// state 的值說明
// -1:worker初始化; 1 :鎖被獨佔; 0:鎖空閑
//是否持有鎖 AQS父類方法的實現
protected boolean isHeldExclusively() {
return getState() != 0;
}
//以獨佔方式獲取鎖,將state設為1 AQS父類方法的實現
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false; //假如state=1,那麼cas失敗,返回false,執行緒就會進入AQS隊列等待
}
//釋放鎖。state設為0 AQS父類方法的實現
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//提供加鎖和解鎖的方法
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//向執行緒發起中斷請求
// 符合:1、運行中的;2、沒有處於中斷 才能中斷
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類總結:
- 所謂的執行緒池,其實就是正在運行的多個Worker執行緒。
- Worker作為執行緒啟動後,它實際執行的是通過execute()提交的Runnable任務(實際業務),worker執行緒通過一個while循環來不斷獲取並任務,從而達到執行緒復用的效果
- firstTask:執行緒要執行的第1個任務(可能為 null) 它表示這個任務立即執行,不需要放到任務隊列。在 1、執行緒數<核心執行緒數 2、隊列已滿且執行緒池不在運行狀態 這兩個場景下。
4 重要方法詳解
4.1 execute()方法
execute()用來提交要運行的任務
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 計算當前執行緒池的狀態及執行緒數
// 1、執行緒池執行緒數小於配置的核心執行緒數
if (workerCountOf(c) < corePoolSize) {
// 將任務提交給核心執行緒處理
if (addWorker(command, true))
return;
//失敗的情況:1、執行緒池已經被關閉、2、執行緒池執行緒數大於等於核心執行緒數 (不能以true的方式提交了 )
c = ctl.get(); // 重新獲取執行緒池狀態
}
// 2、無空閑核心執行緒,將任務加入隊列
// 再次確認執行緒池為RUNNING狀態,將任務加入隊列【非阻塞式,隊列滿了會立即返回false】
if (isRunning(c) && workQueue.offer(command)) {
//任務加入隊列成功
int recheck = ctl.get() ;//再次獲取當前執行緒池狀態(執行緒池可能被其它執行緒關閉了)
//判斷當前執行緒池狀態是不是RUNNING狀態,不是就從workQueue中刪除command任務
if (! isRunning(recheck) && remove(command))
reject(command);//執行拒絕策略
//如果當前執行緒數是0(那證明還沒有其他工作執行緒去處理這個任務),那麼剛剛的任務肯定在阻塞隊列裡面了,這
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//開啟一個沒有任務的Worker執行緒去執行隊列的任務
}
// 3 workQueue添加worker失敗,即隊列滿了
//創建非核心執行緒並執行任務
else if (!addWorker(command, false)) //如果執行緒創建失敗,說明要麼是執行緒池當前狀態!=RUNNING,或者是任務隊列已滿且執行緒總數達到最大執行緒數了
reject(command);//執行拒絕策略.
}
execute()總結
- 進行三次addWorker的嘗試:
- addWorker(command, true):創建任務並以核心執行緒執行
- 核心執行緒數達到上限, 創建任務添加到任務隊列,不創建執行緒
- addWorker(null, false) :任務添加到隊列後,接著執行緒池被關閉,並且從隊列移除該任務失敗,並且執行緒池執行緒數為0,這時創建任務並以非核心執行緒執行
- addWorker(command, false) :任務隊列已滿,創建非核心執行緒並執行
- 任務提交失敗情況:執行緒池非RUNNING狀態 並且 任務隊列已滿並且執行緒池執行緒數達到最大執行緒數(maximumPoolSize)
4.2 addWorker()方法
//TERMINATED >TIDYING > STOP > SHUTDOWN > RUNNING
//創建新的執行緒執行當前任務
//firstTask: 指定新增執行緒執行的第一個任務或者不執行任務
private boolean addWorker(Runnable firstTask, boolean core) {
//外循環:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果執行緒池狀態是SHUTDOWN、STOP、TIDYING、TERMINATED就不允許提交。
// && 後面的特殊情況,執行緒池的狀態是SHUTDOWN並且要要執行的任務為Null並且隊列不是空,這種情況下是允許增加一個執行緒來幫助隊列中的任務跑完的,因為shutdown狀態下,允許執行完成阻塞隊里中的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null && //execute()有addWorkder(null,false)的場景
! workQueue.isEmpty()))
return false;
//內循環:cas修改工作執行緒數,同時判斷能否添加work
for (;;) {
int wc = workerCountOf(c);
//添加任務前,執行緒池執行緒數已達到上限,此時不允許添加。上限分這三種情況:
// 1、最大支援執行緒數
// 2、以core=true提交時,配置的核心執行緒數。(返回false後,會以core=false再提交一次)
// 3、以core=false提交時,配置的執行緒池可容納最大執行緒數。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //使用core則上限為核心執行緒數,否則最大執行緒數
return false;
//沒超過上限,通過CAS的方式增加worker的數量(+1),增加成功就跳出外層循環
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); //獲取最新的執行緒池狀態,與剛開始的狀態比較
// - 變了,就從外層循環重新執行,重新進行狀態的檢查。
// - 沒變,從當前循環重新執行,重新執行CAS操作。
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創建Worker,並給firstTask賦初值
w = new Worker(firstTask);
final Thread t = w.thread; //拿到屬性thread
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //此處加鎖:因為涉及屬性:workers、largestPoolSize(可能) 更新
try {
int rs = runStateOf(ctl.get()); //獲取執行緒池最新狀態
if (rs < SHUTDOWN || //如果當前狀態是<SHUTDOWN也就是RUNNING狀態
(rs == SHUTDOWN && firstTask == null)) { //或者狀態是SHUTDOWN並且當前任務是空的(比如前面說的場景:阻塞隊里裡面還有,但當前已經是不允許提交的狀態了)
if (t.isAlive()) // 檢查Worker執行緒已經開始跑了。(thread.start()變為alive)
throw new IllegalThreadStateException();
workers.add(w); //增加worker
int s = workers.size(); //獲取最新worker的總數,比較並更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; //表示添加worker成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動worker執行緒。該執行緒會一直循環執行getTask(),直至返回null,執行緒才結束
t.start(); //執行runWorker()
workerStarted = true; //表示執行緒已經跑起來了
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//worker執行緒沒成功啟動,進入失敗處理邏輯
}
return workerStarted;//;返回當前worker是否啟動成功。
}
addWorker()總結:
- 檢查執行緒池狀態以確定能否提交任務
- 校驗能否以核心執行緒的方式提交任務
- 執行緒池的狀態是SHUTDOWN並且任務隊列不是空,允許增加一個執行緒來幫助隊列中的任務跑完,但不會提交任務
- 更新執行緒池執行緒數
- 超過執行緒池執行緒數峰值則更新峰值(largestPoolSize)
- 加鎖(mainLock)來更新
- 啟動worker執行緒
4.3 runWorker()方法
//執行任務
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); //runWorker()是由Worker.run()調用,因此wt就是worker執行緒
Runnable task = w.firstTask; //拿到firstTask並賦值給局部變數task
w.firstTask = null; //firstTask置空
w.unlock(); // 將state設置為0。因為構造函數設成-1,在執行任務前置為0。
boolean completedAbruptly = true;//標識任務是不是立刻就完成了。
try {
//循環:先執行firstTask(不為空),後續通過getTask()獲取任務。
while (task != null || (task = getTask()) != null) {
//任務執行前加鎖,任務完成後解鎖。
//任何地方可通過判斷鎖狀態來確認worker是否執行中
w.lock(); //加鎖。防止任務在執行過程中被中斷。
//判斷目的:確保執行緒池當狀態值大於等於 STOP 時有向執行緒發起過中斷請求【調用了shutdownNow()】
// 兩種情況:
//1)如果當前執行緒池的狀態是>=Stop的,並且當前執行緒沒有被中斷,那麼就要執行中斷。
//2)或者當前執行緒目前是已中斷的狀態並且執行緒池的狀態也是>=Stop的(注意Thread.interrupted是會擦除中斷標識符的),那麼因為中斷標識符已經被擦除了,那麼!wt.isInterrupted()一定返回true,這個時候還是要將當前執行緒中斷。第二次執行runStateAtLeast(ctl.get(), STOP)相當於一個二次檢查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//中斷worker執行緒 。因為執行緒池將要終止了,所以這裡沒有從workerSet移除當前執行緒
try {
beforeExecute(wt, task);//前置操作,空方法,可以業務自己實現
Throwable thrown = null;
try {
//執行任務:就是執行通過execute()提交的Runnable
task.run();//第一個是firstTask,後面的是通過getTask()拿到的任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//後置操作,空方法,可以業務自己實現
}
} finally {
task = null;//最後將task置為null,觸發while循環的條件getTask()
w.completedTasks++; //已完成的任務計數器+1
w.unlock();//釋放當前執行緒的獨佔鎖
}
}
completedAbruptly = false; //當第一個try的程式碼塊有異常, completedAbruptly = false 不生效。最後completedAbruptly為true表示發生未知異常了
} finally {
//getTask返回null時,執行任務退出
processWorkerExit(w, completedAbruptly);//completedAbruptly=true表示是突然退出的
}
}
runWorker()總結:
- 執行任務前先判斷執行緒池是否是STOPING狀態,是則中斷worker執行緒。
- 執行任務:先執行firstTask,再從任務隊列獲取執行
- 如果沒有任務,調用processWorkerExit()來執行執行緒退出的工作。
- 只要還有任務,worker執行緒就一直執行任務,並刷新completedTasks
4.4 getTask()方法
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//1、先判斷能否獲取到任務
// 1)如果執行緒池的狀態是>=STOP狀態,這個時候不再處理隊列中的任務,並且減少worker記錄數量,返回的任務為null,這個時候在runRWorker方法中會執行processWorkerExit進行worker的退出操作.
// 2)如果執行緒池的狀態是>=SHUTDOWN並且workQueue為空,就說明處於SHOTdown以上的狀態下,且沒有任務在等待,那麼也屬於獲取不到任務,getTask返回null.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//扣減執行緒池執行緒數,在processWorkerExit()處理執行緒退出
return null;
}
int wc = workerCountOf(c);//獲取當前wokrer的數量
//以下涉及空閑執行緒是否會被執行緒池銷毀的處理邏輯
// 執行緒超時處理前置條件:開啟核心執行緒超時 或 執行緒池執行緒數大於核心執行緒數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//執行緒超時處理的進一步判斷:
// 執行緒池執行緒數超過maximumPoolSize 或者 執行緒設置允許超時且當前worker取任務超時
//並且
// 執行緒池大小不是零或阻塞隊列是空的),這種就返回null,並減少執行緒池執行緒計數
// 1、 (wc>maximumPoolSize) && (wc>1) 一般情況,執行緒池執行緒數會少於配置的最大執行緒數,但在addWork中 狀態=shutdown且隊列不為空時,會創建一個Worker,此時可能導致wc>maximumPoolSize,這裡同時限定wc>1。因此執行緒池減少1個執行緒也不影響任務的執行【processWorkerExit()會保證還有任務就至少留有1個worker執行緒】。
// 2、 (wc>maximumPoolSize) && (workQueue.isEmpty()) 沒有任務了,扣減更不影響
// 3 、(timed && timedOut) && (wc > 1) 超時了,先扣減再說
// 4 、(timed && timedOut) && (workQueue.isEmpty()) 超時了&隊列沒有任務,必須要扣減
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//這裡為啥不用decrementWorkerCount()呢,上面使用decrementWorkerCount()是因為確定不管是什麼情況下,數量都要減,多減一次也沒事,因為這個時候就是要關閉執行緒池釋放資源
//這裡不一樣,執行緒池的狀態可能是RUNNING狀態,多減一次,可能導致獲取不到worker去跑
if (compareAndDecrementWorkerCount(c))
return null; //扣減執行緒池執行緒數,在processWorkerExit()處理執行緒退出
continue;//扣減失敗, 跳出本次循環重新檢查
}
//從隊列中獲取任務
//符合【執行緒超時處理前置條件】時用poll設置超時時間,不符合就使用take(阻塞直至有返回)
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; //task不為空,此處返回task
timedOut = true; // 此處,r == null,肯定是poll操作超時了(注意,不代表隊列空了),繼續for循環,回到if ((wc > maximumPoolSize || (timed && timedOut)) 這個地方退出循環
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
getTask()總結:
- 從workQueue中獲取一個任務並返回
- 沒有獲取到任務就扣減執行緒池執行緒數。獲取不到任務的四種情況:
- 執行緒池的狀態是>=STOP
- 執行緒池的狀態是SHUTDOWN並且任務隊列為空
- 獲取任務超時
- 執行緒池執行緒數大於maximumPoolSize並且隊列為空
4.5 processWorkerExit()方法
//worker執行緒沒有拿到任務,成為空閑執行緒。該方法對空閑執行緒進一步處理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果completedAbruptly為true,則說明執行緒執行時出現異常,需要將workerCount數量減一
//如果completedAbruptly為false,說明在getTask方法中已經對workerCount進行減一,這裡不用再減
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新已完成任務的數量的統計項
completedTaskCount += w.completedTasks;
//從worker集合中移除該worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試關閉執行緒池,但如果是正常運行狀態,就不會關閉
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//1、執行緒池是SHUTDOWN或RUNNING(如果不是這兩個狀態,說明執行緒已經停止了,不做任何操作)
if (!completedAbruptly) {//2、執行緒正常結束
// 如果沒有開啟核心執行緒超時配置,則至少保留corePoolSize個執行緒;
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())//如果允許核心執行緒超時並且當前隊列裡面還有任務沒跑,必須留1個執行緒,不能全死掉.
min = 1;
// 如果執行緒池數量>=最少預留執行緒數
if (workerCountOf(c) >= min)
return; // 執行緒自然結束了,不用補充worker
}
// 1、執行任務異常結束的,補充worker
// 2、如果執行緒池數量<最少預留執行緒數,補充worker
addWorker(null, false);//異常結束 增加worker
//注: 別問我為啥上面要刪除worker,還要再加,不刪是不是不用加了. 明確下那個任務已經退出getTask那塊的死循環了,永遠回不去了,只能新增worker.
}
}
processWorkerExit()方法總結!!!!!:
- 當Worker執行緒結束前,完成以下工作:扣減執行緒池執行緒數(ctl)、更新已完成任務數(completedTaskCount)、Worker集合中移除一個Worker(workers)、嘗試終止執行緒池、計算執行緒池的最少保留執行緒數、根據最少保留執行緒數來確定是否補充一個Worker。
- 關於最少保留執行緒數:如果沒有開啟核心執行緒超時配置,則至少保留corePoolSize個執行緒;如果開啟核心執行緒超時並且當前隊列裡面還有任務,只需保留1個執行緒;
- 需要補充worker的兩種情況:1、執行緒池執行緒數<最少保留執行緒數 2、任務執行異常結束
4.6 tryTerminate()方法
//嘗試終止執行緒池
final void tryTerminate() {
for (;;) { //cas自旋 確保更新成功
int c = ctl.get();
//RUNNING狀態,不能終止執行緒池
//執行緒池狀態是TIDYING或TERMINATED說明執行緒池已經處於正在終止的路上,不用再終止了.
//狀態為SHUTDOWN,但是任務隊列不為空,也不能終止執行緒池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//調用shutdown()或者shutdownNow()方法時,執行以下處理
//工作執行緒數量不等於0,中斷一個空閑的工作執行緒並返回
//這個時候執行緒池一定是 1、STOP的狀態或者 2、SHUTDOW且隊列為空 這兩種情況中斷一個空閑worker
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 設置執行緒池狀態為TIDYING,如果設置成功,則調用terminated()
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); //鉤子方法,子類實現。默認什麼都不做
} finally {
// 設置狀態為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); //喚醒阻塞等待的執行緒 (future的場景)
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate()總結
- 嘗試終止執行緒池
- 不能終止執行緒池:
- 狀態是RUNNING,不能直接終止(如果是調用shutdown(),shutdownNow(),會先將狀態改為SHUTDOWN)
- 狀態是TIDYING或者TERMINATED,不能終止(因為已經處於終止過程中)
- 狀態是SHUTDOWN並且任務隊列不為空,不能終止(因為還有任務要處理)
- 可以終止執行緒池:
- 狀態是SHUTDOWN並且任務隊列為空
- 狀態是STOP
- 符合可以終止執行緒池的條件下,如果執行緒池執行緒數不等於0,那就中斷1個Worker執行緒,不修改執行緒池狀態
- 符合可以終止執行緒池的條件下,並且執行緒池執行緒數等於0,那就將執行緒池狀態改為TIDYING,執行完鉤子方法terminated()後狀態再改為TERMINATED
interruptIdleWorkers(ONLY_ONE); 是否好奇為啥這裡只中斷一個worker呢, 這裡就涉及到了執行緒池的優雅退出了.
當執行到 interruptIdleWorkers(ONLY_ONE) 前面的時候, 執行緒池只能處於兩種狀態:
1) STOP 狀態 , 這個時候 workQueue 可能是有值的 , workQueue 在清空的過程中了.
2) SHUTDOWN 狀態並且 workQueue 是空的 .
這兩種狀態都是說明, 執行緒池即將關閉, 或者說空閑的執行緒此時已經沒用了,這個時候隨手關一個, 反正要關,早關晚關而已.
4.7 interruptIdleWorker()方法
//中斷一個或多個執行緒
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍歷worker,根據onlyOne判斷,如果為ture只中斷一個執行緒
for (Worker w : workers) {
Thread t = w.thread;
//執行緒沒有被中斷並且執行緒是空閑狀態
//通過tryLock實現:不能中斷還沒有開始執行或者還在執行中的worker執行緒。
//執行緒未啟動:-1 ,執行緒正在執行:1 ,trylock:0->1 ;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); //中斷操作,之後該執行緒就結束了
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorker()總結:
- 從worker集合中遍歷並中斷worker執行緒
- 只有worker執行緒狀態是0的,才能夠中斷(不能中斷未啟動或者還在執行中的Worker執行緒)
4.8 shutdown()方法
//初始化一個有序的關閉,之前提交的任務都會被執行,但是新提交的任務則不會被允許放入任務隊列中。如果之前被調用過了的話,那麼再次調用也沒什麼用
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //mainLock是全局變數,加鎖確保不會並發關閉執行緒池
try {
checkShutdownAccess();//安全策略判斷。方法檢查每一個執行緒池的執行緒是否有可以ShutDown的許可權。
advanceRunState(SHUTDOWN); //CAS自旋把ctl中的狀態從RUNNING變為SHUTDOWN
interruptIdleWorkers();//中斷所有空閑執行緒
onShutdown(); // 方法告知子類,執行緒池要處於ShutDown狀態了 ,ScheduledThreadPoolExecutor預留的鉤子
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試終止執行緒池
}
shutdown()方法總結
- 執行shutdown()方法:關閉執行緒池,不再接受新的任務,已提交執行的任務繼續執行。
- 調用interruptIdleWorkers()先中斷所有空閑執行緒
- 調用tryTerminate()嘗試終止執行緒池
- shutdown()將執行緒池狀態改為SHUTDOWN但不是STOP
4.9 shutdownNow()方法
//關閉執行緒池,不再接受新的任務,正在執行的任務嘗試終止
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//執行緒池的狀態置為STOP
interruptWorkers();
tasks = drainQueue(); //將剩餘任務返回
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) //循環所有的worker
w.interruptIfStarted();//已經啟動的執行緒直接執行中斷
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
//只有剛剛構建的worker的時候,狀態state值是-1(這裡也能體現剛構建的worker無法被中斷),其他情況都是>=0的
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
ShutDownNow()方法總結
- 關閉執行緒池,不再接受新的任務,中斷已經啟動的Worker執行緒
- 將執行緒池狀態改為STOP
- 返回未完成的任務隊列
4.10 isShutdown()方法
確認執行緒池是否關閉。判斷狀態是不是RUNNING.
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
4.11 prestartCoreThread()方法
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
- 啟動一個空閑的執行緒作為核心執行緒
- 如果核心執行緒數已到閾值, 會加入失敗, 返回false, 如果執行緒池處於SHUTDOWN以上的狀態也返回false
- 只有真正這個執行緒調用start方法跑起來, 才會返回true
4.12 prestartAllCoreThreads()方法
啟動所有核心執行緒,使他們等待獲取任務
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//null代表空閑執行緒,true代表是增加的是核心執行緒
++n;//死循環增加空閑 worker 而已
return n;
}