從源碼角度來分析執行緒池-ThreadPoolExecutor實現原理
作為一名Java開發工程師,想必性能問題是不可避免的。通常,在遇到性能瓶頸時第一時間肯定會想到利用快取來解決問題,然而快取雖好用,但也並非萬能,某些場景依然無法覆蓋。比如:需要實時、多次調用第三方API時,該場景快取則無法適用。
然多執行緒並發的方式則很好的解決了上述問題。
但若每次都在任務開始時創建新的執行緒,在任務結束後銷毀執行緒,這樣增加了資源的消耗,也降到了響應速度。針對此,執行緒池通過維護多個執行緒的方式來降低創建執行緒的次數,並且對執行緒的創建、銷毀以及數量加以很好的控制,保證對內核的充分利用。
總結起來:
- 降低了資源的消耗:通過池化技術,重複利用已創建好的執行緒。
- 增加了響應的速度:若有空閑執行緒時,直接執行任務,無需等待創建新的執行緒。
- 增加了系統的穩定性和可管理性:對系統而言,過多的執行緒會導致資源調度失衡,降低了系統的穩定性。執行緒池進行統一的管理(調優,監控和分配等),減少碎片化資訊。
I. 執行緒池的類關係圖
【JDK1.8】

從執行緒池類圖關係可見,Java中執行緒池核心實現類是ThreadPoolExecutor, ThreadPoolExecutor實現的頂層介面是Executor,從圖中可以發現:
- Executor介面只有一個execute()方法,將任務的提交和運行進行解耦。
- ExecutorService介面在Executor的基礎上,增加了生命周期的控制(執行緒池狀態轉換)和submit()方法生成Future結果。
- AbstractExecutorService是一個抽象類,實現了ExecutorService介面中的submit()方法,並實現了任務的執行流程。
- ThreadPoolExecutor主要實現兩個功能:執行緒池生命周期管理,任務的執行execute()方法。
作為整個架構中最核心的東西-ThreadPoolExecutor,接下來便從源碼的角度來分析一二。
II. 生命周期管理
執行緒池的資訊變數ctl是一個原子整型變數,封裝了兩個概念行欄位:
- workerCount: 表示執行緒池中有效的執行緒數量
- runState: 表示執行緒池當前的運行狀態
ctl資訊
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor

- COUNT_BITS=29(十進位)
- CAPACITY=0000 1111 1111 1111 1111 1111 1111 1111(二進位)
資訊獲取:
- int c = ctl.get();
- workerCount = workerCountOf(c); //ctl的低28位表示執行緒數量
- runState = runStateOf(c); //ctl的高四位表示狀態
運行狀態
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor

從源碼中可見,執行緒池主要有5種狀態: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED,狀態之間的關係如下圖:

- RUNNING: 可以接受新的任務請求,也可處理阻塞隊列中的任務
- SHUTDOWN: 不接受新的任務請求,但會處理阻塞隊列中的任務
- STOP: 不接受新的任務請求,阻塞隊列也會直接清空,正在處理的任務也會被直接中斷
- TIDYING: 所有的任務都已經終止,執行緒池中不存在有效執行緒
- TERMINATED: 執行緒池終止
從上述狀態轉換圖,我們發現狀態的切換主要由shutdown(), shutdownNow(), tryTerminate()以及terminated()幾個方法實現。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdown()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptIdleWorkers(): 終止空閑執行緒。

從程式碼中可以發現,執行shutdown()函數會將執行緒池狀態置為SHUTDOWN狀態,並且會終止所有的空閑執行緒。這裡通過java.util.concurrent.ThreadPoolExecutor.Worker.tryLock()方法嘗試獲取當前執行緒的AQS獨佔模式鎖,如果目標執行緒處於空閑狀態,則可成功獲取鎖;反之,若執行緒正在執行task,則獲取鎖失敗,以此來判斷執行緒是否處於空閑狀態。具體的流程圖如下:

注意: shutdown()函數僅僅是終止了執行緒池中空閑的執行緒,正在執行任務的執行緒依舊可以正常工作,所以處於SHUTDOWN狀態下的執行緒池依舊可以從阻塞隊列中獲取任務並執行,但不會接受新的task請求(詳情見添加worker部分)。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdownNow()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptWorkers(): 終止執行緒池中所有的執行緒。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.interruptIfStarted(): 中斷當前的執行緒。

從程式碼中可見,執行shutdownNow()函數會將執行緒池置為STOP狀態,終止執行緒池中所有的執行緒,由於執行緒池無法執行task,所以這裡會將阻塞隊列中所有的task取出並返回。具體的流程圖如下:

注意:由於所有的執行緒都被終止了,所以處於STOP狀態下的執行緒池會中斷當前正在執行的任務,無法從阻塞隊列中獲取任務並執行,也無法接受新任務的請求。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.tryTerminate()

從程式碼中可見,tryTerminate()函數僅在執行緒池處於STOP狀態或者SHUTDOWN狀態下阻塞隊列為空的場景下,才會進一步執行,否則執行結束。需要注意的是,處於SHUTDOWN狀態下,雖然阻塞隊列為空,但是依然存在執行緒正在執行任務的可能,所以需要進行二次檢查。在將執行緒池狀態置為TIDYING狀態前,如果執行緒池資訊發生任何變化,都需要重新檢查,避免一些突發異常的發生。具體流程圖如下:

注意:執行緒池處於TIDYING狀態下會直接執行terminated()方法,默認該方法為空(當然用戶可進行自定義重寫),之後將執行緒池狀態置為TERMINATED,理論上處於該狀態下的執行緒池已經結束,所以喚醒所有等待執行緒池結束的執行緒,執行其任務。
III. 運行機制

從圖中可見整個運行機制主要分為以下幾個模組:
- 任務請求
- 任務分配
- 添加worker
- 運行worker
- 任務拒絕
接下來,我們看看對應的源碼是如何處理的。
任務請求
ThreadPoolExecutor實現了任務請求和執行的解耦,用戶無需關心是如何執行的任務,只需要提供一個可執行的任務,然後調用execute()方法即可。但在實際編碼中,我們常常需要獲取任務執行的結果,因此,ExecutorService介面在Executor的基礎上增加submit()方法,將任務封裝成RunnableFuture對象,將執行結果保存為Future對象。綜上所言,任務請求有兩種方法,分別如下:
- void execute(Runnable command):不需要獲取任務結果。
- <T> Future<T> submit(Callable<T> task):需要獲取任務結果。
execute(Runnable command)見任務分配模組。
submit(Callalbe<T> task)見獲取任務結果模組。
任務分配
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.execute(Runnable command)

從程式碼中可見,執行execute(Runnable command)函數分配任務時主要有以下四個選擇:
- 執行緒池執行緒數<核心執行緒數:創建一條新的執行緒,並在該執行緒上直接執行任務;
- 執行緒池執行緒數>=核心執行緒數 && 阻塞隊列未滿: 將任務推入阻塞隊列中,並創建一條新的執行緒(若此時執行緒池存在有效執行緒則不創建),該執行緒獲取阻塞隊列頭部的任務並執行;
- 執行緒池執行緒數>=核心執行緒數 && 阻塞隊列已滿 && 執行緒池執行緒數<最大執行緒數:創建一條新的執行緒,並在該執行緒直接執行任務;
- 執行緒池執行緒數>最大執行緒數 && 阻塞隊列已滿:任務拒絕。
詳細流程圖如下:

注意:執行緒池執行緒數>=核心執行緒數 && 阻塞隊列未滿場景下僅當執行緒池無有效執行緒時才會創建新的執行緒,因為當執行緒池中執行緒執行完任務後會再次嘗試去阻塞隊列獲取任務並執行,直到阻塞隊列為空才會處於空閑狀態。所以,多數場景下此時執行緒池的執行緒數=核心執行緒數,無需創建新的執行緒。當然,也存在臨界場景,比如:此時正好所有的執行緒都恰好執行完任務並被銷毀,為了避免運行異常,則需創建一個新的執行緒,從阻塞隊列中獲取任務並執行。
添加worker
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core)

從程式碼可見,addWorker(Runnable firstTask, boolean core)函數擁有兩個入參:
- firstTask:新創建執行緒執行的第一個任務,這裡特指新提交的任務
- core:ture表示核心執行緒數,false表示最大執行緒數
此外,只有執行緒池處於(RUNNING狀態)或者處於(SHUTDOWN狀態+阻塞隊列不為空)時,才可以創建worker對象,並且在SHUTDOWN狀態下firstTask(新提交的任務)必須為空,才會將新創建的worker對象加入到workers列表中,否則添加worker任務失敗,即SHUTDOWN狀態下不接受新提交的任務請求。

注意:當使用java.util.concurrent.ThreadPoolExecutor.compareAndIncrementWorkerCount()方法嘗試登記執行緒失敗時,需要判斷下是否已經影響了執行緒池的狀態,如果有,則重新獲取執行緒池狀態進行相關校驗,若沒有,則重新獲取執行緒池執行緒數量,並進行執行緒池執行緒數量的檢測。
運行執行緒任務通過thread.start()方法,即調用了worker對象的run()方法,所以執行worker執行緒任務會調用worker.run()方法。
運行worker
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.run()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.runWorker(Worker w)

從程式碼可見,
1)runWorker(Worker w)函數會先執行firstTask(如果不為空),等第一個任務執行完會繼續嘗試從阻塞隊列中獲取任務,然後繼續執行;直到阻塞隊列為空。
2)這裡並沒有使用前面常見的可重入鎖ReentranLock,而是使用了自身繼承的AQS(AbstractQueuedSynchronizer)鎖。當獲取任務成功後,目標執行緒獲取AQS獨佔模式鎖,表示該執行緒處於忙碌狀態,直到任務執行完畢才會釋放。如果上個任務執行完畢後,目標執行緒一直無法獲取新的任務(阻塞隊列為空),則不會獲取AQS獨佔模式鎖,表示目標執行緒處於空閑狀態。
3)任務的最終執行還是調用任務的run()方法,所以請求的任務是可執行命令Runnable task.
具體的流程圖如下:

注意:在實際執行任務前,需要獲取執行緒池的狀態,確保此時調用shutdownNow()方法可以及時中斷任務。此外,前置任務和後置任務默認為空,用戶可自定義重寫。
任務拒絕
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.reject(Runnable command)

從程式碼可見,拒絕任務時調用了handler對象的rejectedExecution()方法。
ThreadPoolExecutor實現了四種不同的策略類,分別為:
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy:調用主執行緒(調用ThreadPoolExecuotr的執行緒)直接執行任務。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.AbortPolicy: 直接放棄任務,拋出java.util.concurrent.RejectedExecutionException異常。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardPolicy: 直接放棄任務,不做任何處理。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy: 獲取阻塞隊列頭部的任務,並直接將其拋棄;然後,重新請求任務。

獲取任務結果
在實際編碼中,我們常常需要獲取任務執行的結果,因此,ExecutorService介面在Executor的基礎上增加submit()方法,將任務封裝成RunnableFuture對象,將執行結果保存為Future對象。
源碼分析(JDK1.8)java.util.concurrent.AbstractExecutorService.submit(Callalbe<T> task)

注意:AbstractExecutorrService同樣實現submit(Runnable<T> task)方法,這裡僅以submit(Callalbe<T> task)方法進行分析。
源碼分析(JDK1.8)java.util.concurrent.AbstractExecutorService.newTaskFor(Callable<T> callable): 將任務封裝成RunableFuture對象。

從上述運行worker模組可知,執行緒上執行任務時,會調用任務的run()方法,這裡封裝成java.util.concurrent.FutureTask對象,在執行緒執行任務時,會調用FutureTask.run()方法
源碼分析(JDK1.8)java.util.concurrent.FutureTask.run()

源碼分析(JDK1.8)java.util.concurrent.FutureTask.set():將結果保存到outcome屬性中,以便後面獲取結果。

源碼分析(JDK1.8)java.util.concurrent.FutureTask.get():取出運行結果。

注意:這裡調用get()方法會一直等待任務執行結束或拋出異常,然後返回結果。
源碼分析(JDK1.8)java.util.concurrent.FutureTask.get(long timeout, TimeUnit unit):取出運行結果,超時拋出java.util.concurrent.TimeoutException異常。

注意:get(long timeout, TimeUnit unit)中unit表示時間單位(例如:SECONDS),timeout表示時間值。調用該函數獲取運行結果時,如果等待時間超時,會直接拋出java.util.concurrent.TimeoutException異常。
具體流程圖如下:

IV.實際案例
submit(Callable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static String task(String args) { System.out.println(String.format("submit - thread name: %s, args: %s", Thread.currentThread().getName(), args)); return args; } private static void submitTask() throws InterruptedException, ExecutionException, TimeoutException { List<Future> futures = new ArrayList<>(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); Future f = executor.submit(() -> task(str)); futures.add(f); } for (Future f: futures) { String result = (String) f.get(60, TimeUnit.SECONDS); System.out.println(String.format("submit - thread name: %s, result: %s", Thread.currentThread().getName(), result)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { submitTask(); } }
輸出結果:
submit - thread name: pool-1-thread-3, args: l submit - thread name: pool-1-thread-1, args: h submit - thread name: pool-1-thread-4, args: l submit - thread name: pool-1-thread-2, args: e submit - thread name: pool-1-thread-3, args: o submit - thread name: main, result: h submit - thread name: main, result: e submit - thread name: main, result: l submit - thread name: main, result: l submit - thread name: main, result: o
execute(Runnable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static class Task implements Runnable { private String arg; Task(String arg) { this.arg = arg; } public void run() { System.out.println(String.format("execute - thread name: %s, args: %s", Thread.currentThread().getName(), arg)); } } private static void executeTask() throws InterruptedException {final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); executor.execute(new Task(str)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { executeTask(); } }
輸出結果:
execute - thread name: pool-1-thread-3, args: l
execute - thread name: pool-1-thread-1, args: h
execute - thread name: pool-1-thread-4, args: l
execute - thread name: pool-1-thread-2, args: e
execute - thread name: pool-1-thread-3, args: o