拜託,不要再問我執行緒池啦!
Java提供了幾種便捷的方法創建執行緒池,通過這些內置的api就能夠很輕鬆的創建執行緒池。在java.util.concurrent
包中的Executors
類,其中的靜態方法就是用來創建執行緒池的:
- newFixedThreadPool():創建一個固定執行緒數量的執行緒池,而且執行緒池中的任務全部執行完成後,空閑的執行緒也不會被關閉。
- newSingleThreadExecutor():創建一個只有一個執行緒的執行緒池,空閑時也不會被關閉。
- newCachedThreadPool():創建一個可快取的執行緒池,執行緒的數量為
Integer.MAX_VALUE
,空閑執行緒會臨時快取下來,執行緒會等待60s
還是沒有任務加入的話就會被關閉。
Executors
類中還有一些創建執行緒池的方法(jdk8新加的),但是現在這個觸極到我的知識盲區了~~
上面那幾個方法,其實都是創建了一個ThreadPoolExecutor
對象作為返回值,要搞清楚執行緒池的原理主要還是要分析ThreadPoolExecutor
這個類。
ThreadPoolExecutor
的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
ThreadPoolExecutor
的構造方法包含以下幾個參數:
- corePoolSize: 核心執行緒數量,常駐執行緒池中的執行緒,即時執行緒池中沒有任務可執行,也不會被關閉。
- maximumPoolSize:最大執行緒數量
- keepAliveTime:空閑執行緒存活時間
- unit: 空閑執行緒存活時間的單位
- workQueue:工作隊列,執行緒池一下忙不過來,那新來的任務就需要排隊,排除中的任務就會放在workQueue中
- threadFactory:執行緒工廠,創建執行緒用的
- handler:
RejectedExecutionHandler
實例用於在執行緒池中沒有空閑執行緒能夠執行任務,並且workQueue
中也容不下任務時拒絕任務時的策略。
ThreadPoolExecutor
中的執行緒統稱為工作執行緒,但有一個小概念是核心執行緒
,核心執行緒由參數corePoolSize
指定,如corePoolSize
設置5,那執行緒池中就會有5條執行緒常駐執行緒池中,不會被回收掉,但是也會有例外,如果allowCoreThreadTimeOut
為true
空閑一段時間後,也會被關閉。
執行緒的狀態和工作執行緒數量
執行緒中的狀態和工作執行緒和數量都是由ctl
表示,是一個AtomicInteger
類型的屬性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl的高四位為執行緒的狀態,其他位數為工作執行緒的數量,所以執行緒中最大的工作執行緒數量為(2^29)-1
。
執行緒池中的狀態有五種:
- RUNNING:接收新的任務和處理隊列中的任務
- SHUTDOWN:不能新增任務,但是會繼續處理已經添加的任務
- STOP:不能新增任務,不會繼續處理已經添加任務
- TIDYING:所有的任務已經被終止,工作執行緒為0
- TERMINATED:terminated()方法執行完成
狀態碼的定義如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
創建執行緒池
如果有面試官問:如何正確的創建執行緒池?千萬不要說使用Executors
創建執行緒,雖然Executors
能很方便的創建執行緒池,但是他提供的靜態創建方法會有一些坑。
主要的原因是:maximumPoolSize
和workQueue
這兩個參數
Executors
靜態方法在創建執行緒池時,如果maximumPoolSize
設置為Integer.MAX_VALUE
,這樣會導致執行緒池可以一直要以接收運行任務,可能導致cpu負載過高。
workQueue
是一個阻塞隊列的實例,用於放置正在等待執行的任務。如果在創建執行緒種時workQueue
實例沒有指定任務的容量,那麼等待隊列中可以一直添加任務,極有可能導致oom
。
所以創建執行緒,最好是根據執行緒池的用途,然後自己創建執行緒。
添加任務
調用執行緒池的execute
並不是立即執行任務,執行緒池內部用經過一頓操作,如:判斷核心執行緒數、是否需要添加到等待隊列中。
下來的程式碼是execute
的源碼,程式碼很簡潔只有2個if
語句:
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 第一個if,如果當前執行緒池中的工作執行緒數量小於
corePoolSize
,直接創建一個工作執行緒執行任務 - 第二個if,當執行緒池處於運行狀態,調用
workQueue.offer(command)
方法將任務添加到workQueue
,否則調用addWorker(command, false)
嘗試去添加一個工作執行緒。
整理了一張圖,把執行緒池分為三部分Core Worker
、Worker
、workQueue
:
換一種說法,在調用execute
方法時,任務首先會放在Core Worker
內,然後才是workQueue
,最後才會考慮Worker
。
這樣做的原因可以保證Core Worker
中的任務執行完成後,能立即從workQueue
獲取下一個任務,而不需要啟動別的工作執行緒,用最少的工作執行緒辦更多的事。
創建工作執行緒
在execute
方法中,有三個地方調用了addWorker
。addWorker
方法可以分為二部分:
- 增加工作執行緒數量
- 啟動工作執行緒
addWorker
的方法簽名如下:
private boolean addWorker(Runnable firstTask, boolean core)
- firstTask:第一個運行的任務,可以為空。如果為空任務會從
workQueue
中獲取。 - core: 是否是核心工作執行緒
增加工作執行緒數量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
....
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
上面程式碼省略了一部分程式碼,主要程式碼都在for
循環中,利用CAS
鎖,安全的完成執行緒池狀態的檢查與增加工作執行緒的數量。其中的compareAndIncrementWorkerCount(c)
調用就是將工作執行緒數量+1。
啟動工作執行緒
增加工作執行緒的數量後,緊接著就會啟動工作執行緒:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
啟動工作執行緒的流程:
- 創建一個
Worker
實例,Worker
構造方法會使用ThreadFactory
創建一個執行緒
w = new Worker(firstTask);
final Thread t = w.thread;
就不說Worker
類的實現了,直接給出構造方法來細品:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
- 如果執行緒池狀態是在運行中,或者已經關閉,但工作執行緒要從
workQueue
中獲取任務,才能添加工作執行緒
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
注意::當執行緒池處於SHUTDOWN
狀態時,它不能接收新的任務,但是可以繼續執行未完成的任務。任務是否從workQueue
中獲取,是根據firstTask
判斷,每個Worker
實例都有一個firstTask
屬性,如果這個值為null
,工作執行緒啟動的時候就會從workQueue
中獲取任務,否則會執行firstTask
。
- 啟動執行緒
調用執行緒的start
方法,啟動執行緒。
if (workerAdded) {
t.start();
workerStarted = true;
}
執行任務
回過頭來看一個Worker
類的定義:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
...
}
Worker
類實現了Runnable
介面,同時在構造方法中會將this
傳遞給執行緒,到這裡你就知道了Worker
實例中有run
方法,它會在執行緒啟動後執行:
public void run() {
runWorker(this);
}
run
方法內部接著調用runWorker
方法運行任務,在這裡才是真正的開始運行任務了:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 獲取任務
首先將firstTask
傳遞給task
臨時變數:
Runnable task = w.firstTask;
然後循環檢查task
或者從workQueue
中獲取任務:
while (task != null || (task = getTask()) != null) {
...
}
getTask()
稍後再做分析。
- 運行任務
去掉一些狀態檢查、異常捕獲、和勾子方法調用後,保留最重要的調用task.run()
:
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
task
其實就是通過調用execute
方法傳遞進來的Runnable
實例,也就是你的任務。只不過它可能保存在Worker.firstTask
中,或者在workQueue
中,保存在哪裡在前面的任務添加順序
中已經說明。
從workQueue中獲取任務
試想一下如果每個任務執行完成,就關閉掉一個執行緒那有多浪費資源,這樣使用執行緒池也沒有多大的意義。所以執行緒的主要的功能就是執行緒復用,一旦任務執行完成直接去獲取下一個任務,或者掛起執行緒等待下一個提交的任務,然後等待一段時間後還是沒有任務提交,然後才考慮是否關閉部分空閑的執行緒。
runWorker
中會循環的獲取任務:
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
上面的程式碼getTask()
就是從workQueue
中獲取任務:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
獲取任務的時候會有兩種方式:
- 超時等待獲取任務
- 一直等待任務,直到有新任務
如果allowCoreThreadTimeOut
為true
,corePoolSize
指定的核心執行緒數量會被忽略,直接使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
獲取任務,否則的話會根據當前工作執行緒的數量,如果wc > corePoolSize
為false
則當前會被認為是核心執行緒,調用workQueue.take()
一直等待任務。
工作執行緒的關閉
還是在runWorker
方法中:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
task.run();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- completedAbruptly變數:標記當前工作執行緒是正常執行完成,還是異常完成的。completedAbruptly為
false
可以確定執行緒池中沒有可執行的任務了。
上面程式碼是簡潔後的程式碼,一個while
循環保證不間斷的獲取任務,沒有任務可以執行(task為null)退出循環,最後再才會調用processWorkerExit
方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit
接收一個Worker
實例與completedAbruptly
變數。processWorkerExit的大致工作流程:
- 判斷當前工作執行緒是否異常完成,如果是直接減少工作執行緒的數量,簡單的說就是校正一下工作執行緒的數量。
- 增加完成的任務數量,將
Worker
從workers
中移除 - tryTerminate() 檢查執行緒池狀態,因為執行緒池可以延遲關閉,如果你調用
shutdown
方法後不會立即關閉,要等待所有的任務執行完成,所以這裡調用tryTerminate()方法,嘗試去調用terminated
方法。
工作執行緒完成策略
如果某個工作執行緒完成,執行緒池內部會判斷是否需要重新啟動一個:
//判斷執行緒池狀態
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//獲取最小工作執行緒數量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小工作執行緒數量為0,但是workQueue中還有任務,那重置最小工作執行緒數量1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當前工作執行緒數數量大於或等於最小工作執行緒數量,則不需要啟動新的工作執行緒
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//啟動一個新的工作執行緒
addWorker(null, false);
}
工作執行緒完成後有兩種處理策略:
- 對於異常完成的工作執行緒,直接啟動一個新的替換
- 對於正常完成的工作執行緒,判斷當前工作執行緒是否足夠,如果足夠則不需要新啟動工作執行緒
注意:這裡的完成,表示工作執行緒的任務執行完成,workQueue
中也沒有任務可以獲取了。
執行緒池的關閉
關閉執行緒池有可以通過shutdown
方法:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown
方法,第一步就是先改變執行緒池的狀態,調用advanceRunState(SHUTDOWN)
方法,將執行緒池當前狀態更改為SHUTDOWN
,advanceRunState程式碼如下:
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
然後立即調用interruptIdleWorkers()
方法,interruptIdleWorkers()
內部會調用它的重載方法interruptIdleWorkers(boolean onlyOne)
同時onlyOne參數傳遞的false
來關閉空閑的執行緒:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
以上程式碼會遍歷workers
中的Worker
實例,然後調用執行緒的interrupt()
方法。
什麼樣的執行緒才是空閑工作執行緒?
前面提到過在getTask()
中,執行緒從workQueue
中獲取任務時會阻塞,被阻塞的執行緒就是空閑的。
再次回到getTask()
的程式碼中:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
再次分析getTask()
中的程式碼中有一段捕獲InterruptedException
的程式碼塊,interruptIdleWorkers方法中斷執行緒後,getTask()
會捕獲中斷異常,因為外面是一個for
循環,隨後程式碼走到判斷執行緒池狀態的地方:
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
上面的程式碼的會判斷當前執行緒池狀態,如果狀態大於STOP
或者狀態等於SHUTDOWN
並且workQueue
為空時則返回null
,getTask()
返回空那麼在runWorker
中循環就會退出,當前工作執行緒的任務就完成了,可以退出了:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
task.run();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
shutdownNow
除了shutdown方法能關閉執行緒池,還有shutdownNow
也可以關閉執行緒池。它兩的區別在於:
shutdownNow
會清空workQueue
中的任務shutdownNow
還會中止當前正在運行的任務shutdownNow
會使執行緒進入STOP
狀態,而shutdown()
是SHUTDOWN
狀態
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
上面程式碼基本流程:
- advanceRunState(STOP): 使執行緒池進行
STOP
狀態,與shutdown()
中的一致 ,只是使用的狀態碼是STOP
- interruptWorkers(): 與
shutdown()
中的一致 - drainQueue(): 清空隊列
任務是中止執行還是繼續執行?
調用shutdownNow()後執行緒池處於STOP
狀態,緊接著所有的工作執行緒都會被調用interrupt
方法,如果此時runWorker
還在運行會發生什麼?
在runWorker
有一段程式碼,就是工作執行緒中止的重要程式碼:
final void runWorker(Worker w) {
...
while (task != null || (task = getTask()) != null) {
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
task.run();
}
...
}
重點關註:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
這個if看起來有點難理解,理解下來大致意思是:如果執行緒池狀態大於等於STOP
,立即中斷執行緒,否則清除執行緒的中斷標記,也就是說當執行緒池狀態為RUNNING
和SHUTDOWN
時,執行緒的中斷標記會被清除(執行緒的中斷程式碼在interruptWorkers
方法中),可以繼續執行任務。
以上程式碼執行完成後,緊接著就會調用task.run()
方法,這裡面我們自己就可以根據執行緒的中斷標記來判斷任務是否被中斷。
總結
個人水平有限,文中如有錯誤,謝謝大家指正。
本文從執行緒池的源碼入手,分析執行緒池的創建、添加任務、運行任務等流程,整個分析下來基本上大多數公司關於執行緒池面試的問題都可以回答得上來,當然還有一些小細節如:Worker
類是繼承AQS
的,為什麼這麼做其實源碼中都有一些苗頭,Worker
在運行時會鎖住運行的程式碼塊,而shutdown
在關閉空閑的Worker
時,首先就要去獲取Worker
的同步鎖才能繼續操作,這樣才能安全的關閉工作執行緒。
歡迎關注我的公眾號:架構文摘,獲得獨家整理120G的免費學習資源助力你的架構師學習之路!
公眾號後台回復
arch028
獲取資料: