面渣逆襲:執行緒池奪命連環十八問
大家好,我是老三,很高興又和大家見面。執行緒池是面試必問的知識點,這節我們來對線面試官,搞透執行緒池。
1. 什麼是執行緒池?
執行緒池: 簡單理解,它就是一個管理執行緒的池子。
- 它幫我們管理執行緒,避免增加創建執行緒和銷毀執行緒的資源損耗。因為執行緒其實也是一個對象,創建一個對象,需要經過類載入過程,銷毀一個對象,需要走GC垃圾回收流程,都是需要資源開銷的。
- 提高響應速度。 如果任務到達了,相對於從執行緒池拿執行緒,重新去創建一條執行緒執行,速度肯定慢很多。
- 重複利用。 執行緒用完,再放回池子,可以達到重複利用的效果,節省資源。
2. 能說說工作中執行緒池的應用嗎?
之前我們有一個和第三方對接的需求,需要向第三方推送數據,引入了多執行緒來提升數據推送的效率,其中用到了執行緒池來管理執行緒。
主要程式碼如下:
完整可運行程式碼地址://gitee.com/fighter3/thread-demo.git
執行緒池的參數如下:
-
corePoolSize:執行緒核心參數選擇了CPU數×2
-
maximumPoolSize:最大執行緒數選擇了和核心執行緒數相同
-
keepAliveTime:非核心閑置執行緒存活時間直接置為0
-
unit:非核心執行緒保持存活的時間選擇了 TimeUnit.SECONDS 秒
-
workQueue:執行緒池等待隊列,使用 LinkedBlockingQueue阻塞隊列
同時還用了synchronized 來加鎖,保證數據不會被重複推送:
synchronized (PushProcessServiceImpl.class) {}
ps:這個例子只是簡單地進行了數據推送,實際上還可以結合其他的業務,像什麼數據清洗啊、數據統計啊,都可以套用。
3.能簡單說一下執行緒池的工作流程嗎?
用一個通俗的比喻:
有一個營業廳,總共有六個窗口,現在開放了三個窗口,現在有三個窗口坐著三個營業員小姐姐在營業。
老三去辦業務,可能會遇到什麼情況呢?
- 老三發現有空間的在營業的窗口,直接去找xjj辦理業務。
- 老三發現沒有空閑的窗口,就在排隊區排隊等。
- 老三發現沒有空閑的窗口,等待區也滿了,蚌埠住了,經理一看,就讓休息的小姐姐趕緊回來上班,等待區號靠前的趕緊去新窗口辦,老三去排隊區排隊。小姐姐比較辛苦,假如一段時間發現他們可以不用接著營業,經理就讓她們接著休息。
- 老三一看,六個窗口都滿了,等待區也沒位置了。老三急了,要鬧,經理趕緊出來了,經理該怎麼辦呢?
我們銀行系統已經癱瘓
誰叫你來辦的你找誰去
看你比較急,去隊里加個塞
今天沒辦法,不行你看改一天
上面的這個流程幾乎就跟 JDK 執行緒池的大致流程類似,
- 營業中的 3個窗口對應核心執行緒池數:corePoolSize
- 總的營業窗口數6對應:maximumPoolSize
- 打開的臨時窗口在多少時間內無人辦理則關閉對應:unit
- 排隊區就是等待隊列:workQueue
- 無法辦理的時候銀行給出的解決方法對應:RejectedExecutionHandler
- threadFactory 該參數在 JDK 中是 執行緒工廠,用來創建執行緒對象,一般不會動。
所以我們執行緒池的工作流程也比較好理解了:
- 執行緒池剛創建時,裡面沒有一個執行緒。任務隊列是作為參數傳進來的。不過,就算隊列裡面有任務,執行緒池也不會馬上執行它們。
- 當調用 execute() 方法添加一個任務時,執行緒池會做如下判斷:
- 如果正在運行的執行緒數量小於 corePoolSize,那麼馬上創建執行緒運行這個任務;
- 如果正在運行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入隊列;
- 如果這時候隊列滿了,而且正在運行的執行緒數量小於 maximumPoolSize,那麼還是要創建非核心執行緒立刻運行這個任務;
- 如果隊列滿了,而且正在運行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會根據拒絕策略來對應處理。
-
當一個執行緒完成任務時,它會從隊列中取下一個任務來執行。
-
當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前運行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
4.執行緒池主要參數有哪些?
- corePoolSize
此值是用來初始化執行緒池中核心執行緒數,當執行緒池中執行緒池數< corePoolSize
時,系統默認是添加一個任務才創建一個執行緒池。當執行緒數 = corePoolSize時,新任務會追加到workQueue中。
- maximumPoolSize
maximumPoolSize
表示允許的最大執行緒數 = (非核心執行緒數+核心執行緒數),當BlockingQueue
也滿了,但執行緒池中匯流排程數 < maximumPoolSize
時候就會再次創建新的執行緒。
- keepAliveTime
非核心執行緒 =(maximumPoolSize – corePoolSize ) ,非核心執行緒閑置下來不幹活最多存活時間。
- unit
執行緒池中非核心執行緒保持存活的時間的單位
- TimeUnit.DAYS; 天
- TimeUnit.HOURS; 小時
- TimeUnit.MINUTES; 分鐘
- TimeUnit.SECONDS; 秒
- TimeUnit.MILLISECONDS; 毫秒
- TimeUnit.MICROSECONDS; 微秒
- TimeUnit.NANOSECONDS; 納秒
- workQueue
執行緒池等待隊列,維護著等待執行的Runnable
對象。當運行當執行緒數= corePoolSize時,新的任務會被添加到workQueue
中,如果workQueue
也滿了則嘗試用非核心執行緒執行任務,等待隊列應該盡量用有界的。
- threadFactory
創建一個新執行緒時使用的工廠,可以用來設定執行緒名、是否為daemon執行緒等等。
- handler
corePoolSize
、workQueue
、maximumPoolSize
都不可用的時候執行的飽和策略。
5.執行緒池的拒絕策略有哪些?
類比前面的例子,無法辦理業務時的處理方式,幫助記憶:
- AbortPolicy :直接拋出異常,默認使用此策略
- CallerRunsPolicy:用調用者所在的執行緒來執行任務
- DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,也就是隊列里靠前的任務
- DiscardPolicy :當前任務直接丟棄
想實現自己的拒絕策略,實現RejectedExecutionHandler介面即可。
6.執行緒池有哪幾種工作隊列?
常用的阻塞隊列主要有以下幾種:
- ArrayBlockingQueue:ArrayBlockingQueue(有界隊列)是一個用數組實現的有界阻塞隊列,按FIFO排序量。
- LinkedBlockingQueue:LinkedBlockingQueue(可設置容量隊列)是基於鏈表結構的阻塞隊列,按FIFO排序任務,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE,吞吐量通常要高於ArrayBlockingQuene;newFixedThreadPool執行緒池使用了這個隊列
- DelayQueue:DelayQueue(延遲隊列)是一個任務定時周期的延遲執行的隊列。根據指定的執行時間從小到大排序,否則根據插入到隊列的先後排序。newScheduledThreadPool執行緒池使用了這個隊列。
- PriorityBlockingQueue:PriorityBlockingQueue(優先順序隊列)是具有優先順序的無界阻塞隊列
- SynchronousQueue:SynchronousQueue(同步隊列)是一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個執行緒調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene,newCachedThreadPool執行緒池使用了這個隊列。
7.執行緒池提交execute和submit有什麼區別?
- execute 用於提交不需要返回值的任務
threadsPool.execute(new Runnable() {
@Override public void run() {
// TODO Auto-generated method stub }
});
- submit()方法用於提交需要返回值的任務。執行緒池會返回一個future類型的對象,通過這個 future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值
Future<Object> future = executor.submit(harReturnValuetask);
try { Object s = future.get(); } catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉執行緒池 executor.shutdown();
}
8.執行緒池怎麼關閉知道嗎?
可以通過調用執行緒池的shutdown
或shutdownNow
方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個調用執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。
- shutdown() 將執行緒池狀態置為shutdown,並不會立即停止:
- 停止接收外部submit的任務
- 內部正在跑的任務和隊列里等待的任務,會執行完
- 等到第二步完成後,才真正停止
- shutdownNow() 將執行緒池狀態置為stop。一般會立即停止,事實上不一定:
- 和shutdown()一樣,先停止接收外部提交的任務
- 忽略隊列里等待的任務
- 嘗試將正在跑的任務interrupt中斷
- 返回未執行的任務列表
shutdown 和shutdownnow簡單來說區別如下:
shutdownNow()能立即停止執行緒池,正在跑的和正在等待的任務都停下了。這樣做立即生效,但是風險也比較大。shutdown()只是關閉了提交通道,用submit()是無效的;而內部的任務該怎麼跑還是怎麼跑,跑完再徹底停止執行緒池。
9.執行緒池的執行緒數應該怎麼配置?
執行緒在Java中屬於稀缺資源,執行緒池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。
- 計算密集型:大部分都在用CPU跟記憶體,加密,邏輯操作業務處理等。
- IO密集型:資料庫鏈接,網路通訊傳輸等。
- 計算密集型一般推薦執行緒池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些數據在硬碟中需要多來一個執行緒將數據讀入記憶體)。如果執行緒池數太大,可能會頻繁的 進行執行緒上下文切換跟任務調度。獲得當前CPU核心數程式碼如下:
Runtime.getRuntime().availableProcessors();
- IO密集型:執行緒數適當大一點,機器的Cpu核心數*2。
- 混合型:可以考慮根絕情況將它拆分成CPU密集型和IO密集型任務,如果執行時間相差不大,拆分可以提升吞吐量,反之沒有必要。
當然,實際應用中沒有固定的公式,需要結合測試和監控來進行調整。
10.有哪幾種常見的執行緒池?
主要有四種,都是通過工具類Excutors創建出來的,阿里巴巴《Java開發手冊》里禁止使用這種方式來創建執行緒池。
-
newFixedThreadPool (固定數目執行緒的執行緒池)
-
newCachedThreadPool (可快取執行緒的執行緒池)
-
newSingleThreadExecutor (單執行緒的執行緒池)
-
newScheduledThreadPool (定時及周期執行的執行緒池)
11.能說一下四種常見執行緒池的原理嗎?
前三種執行緒池的構造直接調用ThreadPoolExecutor的構造方法。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
執行緒池特點
- 核心執行緒數為1
- 最大執行緒數也為1
- 阻塞隊列是無界隊列LinkedBlockingQueue,可能會導致OOM
- keepAliveTime為0
工作流程:
- 提交任務
- 執行緒池是否有一條執行緒在,如果沒有,新建執行緒執行任務
- 如果有,將任務加到阻塞隊列
- 當前的唯一執行緒,從隊列取任務,執行完一個,再繼續取,一個執行緒執行任務。
適用場景
適用於串列執行任務的場景,一個任務一個任務地執行。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
執行緒池特點:
- 核心執行緒數和最大執行緒數大小一樣
- 沒有所謂的非空閑時間,即keepAliveTime為0
- 阻塞隊列為無界隊列LinkedBlockingQueue,可能會導致OOM
工作流程:
- 提交任務
- 如果執行緒數少於核心執行緒,創建核心執行緒執行任務
- 如果執行緒數等於核心執行緒,把任務添加到LinkedBlockingQueue阻塞隊列
- 如果執行緒執行完任務,去阻塞隊列取任務,繼續執行。
使用場景
FixedThreadPool 適用於處理CPU密集型的任務,確保CPU在長期被工作執行緒使用的情況下,儘可能的少的分配執行緒,即適用執行長期的任務。
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
執行緒池特點:
- 核心執行緒數為0
- 最大執行緒數為Integer.MAX_VALUE,即無限大,可能會因為無限創建執行緒,導致OOM
- 阻塞隊列是SynchronousQueue
- 非核心執行緒空閑存活時間為60秒
當提交任務的速度大於處理任務的速度時,每次提交一個任務,就必然會創建一個執行緒。極端情況下會創建過多的執行緒,耗盡 CPU 和記憶體資源。由於空閑 60 秒的執行緒會被終止,長時間保持空閑的 CachedThreadPool 不會佔用任何資源。
工作流程:
- 提交任務
- 因為沒有核心執行緒,所以任務直接加到SynchronousQueue隊列。
- 判斷是否有空閑執行緒,如果有,就去取出任務執行。
- 如果沒有空閑執行緒,就新建一個執行緒執行。
- 執行完任務的執行緒,還可以存活60秒,如果在這期間,接到任務,可以繼續活下去;否則,被銷毀。
適用場景
用於並發執行大量短期的小任務。
newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
執行緒池特點
- 最大執行緒數為Integer.MAX_VALUE,也有OOM的風險
- 阻塞隊列是DelayedWorkQueue
- keepAliveTime為0
- scheduleAtFixedRate() :按某種速率周期執行
- scheduleWithFixedDelay():在某個延遲後執行
工作機制
- 執行緒從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
- 執行緒執行這個ScheduledFutureTask。
- 執行緒修改ScheduledFutureTask的time變數為下次將要被執行的時間。
- 執行緒把這個修改time之後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
使用場景
周期性執行任務的場景,需要限制執行緒數量的場景
12.使用無界隊列的執行緒池會導致什麼問題嗎?
例如newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果執行緒獲取一個任務後,任務的執行時間比較長,會導致隊列的任務越積越多,導致機器記憶體使用不停飆升,最終導致OOM。
13.執行緒池異常怎麼處理知道嗎?
在使用執行緒池處理任務的時候,任務程式碼可能拋出RuntimeException,拋出異常後,執行緒池可能捕獲它,也可能創建一個新的執行緒來代替異常的執行緒,我們可能無法感知任務出現了異常,因此我們需要考慮執行緒池異常情況。
常見的異常處理方式:
14.能說一下執行緒池有幾種狀態嗎?
執行緒池有這幾個狀態:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
//執行緒池狀態
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
執行緒池各個狀態切換圖:
RUNNING
- 該狀態的執行緒池會接收新任務,並處理阻塞隊列中的任務;
- 調用執行緒池的shutdown()方法,可以切換到SHUTDOWN狀態;
- 調用執行緒池的shutdownNow()方法,可以切換到STOP狀態;
SHUTDOWN
- 該狀態的執行緒池不會接收新任務,但會處理阻塞隊列中的任務;
- 隊列為空,並且執行緒池中執行的任務也為空,進入TIDYING狀態;
STOP
- 該狀態的執行緒不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
- 執行緒池中執行的任務為空,進入TIDYING狀態;
TIDYING
- 該狀態表明所有的任務已經運行終止,記錄的任務數量為0。
- terminated()執行完畢,進入TERMINATED狀態
TERMINATED
- 該狀態表示執行緒池徹底終止
15.執行緒池如何實現參數的動態修改?
執行緒池提供了幾個 setter方法來設置執行緒池的參數。
這裡主要有兩個思路:
-
在我們微服務的架構下,可以利用配置中心如Nacos、Apollo等等,也可以自己開發配置中心。業務服務讀取執行緒池配置,獲取相應的執行緒池實例來修改執行緒池的參數。
-
如果限制了配置中心的使用,也可以自己去擴展ThreadPoolExecutor,重寫方法,監聽執行緒池參數變化,來動態修改執行緒池參數。
16.執行緒池調優了解嗎?
執行緒池配置沒有固定的公式,通常事前會對執行緒池進行一定評估,常見的評估方案如下:
上線之前也要進行充分的測試,上線之後要建立完善的執行緒池監控機制。
事中結合監控告警機制,分析執行緒池的問題,或者可優化點,結合執行緒池動態參數配置機制來調整配置。
事後要注意仔細觀察,隨時調整。
具體的調優案例可以查看參考[7]美團技術部落格。
17.你能設計實現一個執行緒池嗎?
⭐這道題在阿里的面試中出現頻率比較高
執行緒池實現原理可以查看 要是以前有人這麼講執行緒池,我早就該明白了! ,當然,我們自己實現, 只需要抓住執行緒池的核心流程-參考[6]:
我們自己的實現就是完成這個核心流程:
- 執行緒池中有N個工作執行緒
- 把任務提交給執行緒池運行
- 如果執行緒池已滿,把任務放入隊列
- 最後當有空閑時,獲取隊列中任務來執行
實現程式碼[6]:
public class MyThreadPoolExecutor implements Executor {
//記錄執行緒池中執行緒數量
private final AtomicInteger ctl = new AtomicInteger(0);
//核心執行緒數
private volatile int corePoolSize;
//最大執行緒數
private volatile int maximumPoolSize;
//阻塞隊列
private final BlockingQueue<Runnable> workQueue;
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
/**
* 執行
*
* @param command
*/
@Override
public void execute(Runnable command) {
//工作執行緒數
int c = ctl.get();
//小於核心執行緒數
if (c < corePoolSize) {
//添加任務失敗
if (!addWorker(command)) {
//執行拒絕策略
reject();
}
return;
}
//任務隊列添加任務
if (!workQueue.offer(command)) {
//任務隊列滿,嘗試啟動執行緒添加任務
if (!addWorker(command)) {
reject();
}
}
}
/**
* 飽和拒絕
*/
private void reject() {
//直接拋出異常
throw new RuntimeException("Can not execute!ctl.count:"
+ ctl.get() + "workQueue size:" + workQueue.size());
}
/**
* 添加任務
*
* @param firstTask
* @return
*/
private boolean addWorker(Runnable firstTask) {
if (ctl.get() >= maximumPoolSize) return false;
Worker worker = new Worker(firstTask);
//啟動執行緒
worker.thread.start();
ctl.incrementAndGet();
return true;
}
/**
* 執行緒池工作執行緒包裝類
*/
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
try {
//執行任務
while (task != null || (task = getTask()) != null) {
task.run();
//執行緒池已滿,跳出循環
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
//工作執行緒數增加
ctl.decrementAndGet();
}
}
/**
* 從隊列中獲取任務
*
* @return
*/
private Runnable getTask() {
for (; ; ) {
try {
System.out.println("workQueue size:" + workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//測試
public static void main(String[] args) {
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 2,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int taskNum = i;
myThreadPoolExecutor.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務編號:" + taskNum);
});
}
}
}
這樣,一個實現了執行緒池主要流程的類就完成了。
18.單機執行緒池執行斷電了應該怎麼處理?
我們可以對正在處理和阻塞隊列的任務做事務管理或者對阻塞隊列中的任務持久化處理,並且當斷電或者系統崩潰,操作無法繼續下去的時候,可以通過回溯日誌的方式來撤銷正在處理
的已經執行成功的操作。然後重新執行整個阻塞隊列。
也就是:阻塞隊列持久化;正在處理任務事務控制;斷電之後正在處理任務的回滾,通過日誌恢復該次操作;伺服器重啟後阻塞隊列中的數據再載入。
參考:
[1]. 《Java並發編程的藝術》
[2]. 《Java發編程實戰》
[3]. 講真 這次絕對讓你輕鬆學習執行緒池
[4]. 面試必備:Java執行緒池解析
[5]. 面試官問:「在項目中用過多執行緒嗎?」你就把這個案例講給他聽!
[6]. 小傅哥 《Java面經手冊》