java線程池實踐
- 2021 年 5 月 9 日
- 筆記
- JAVA, threadpool
線程池大家都很熟悉,無論是平時的業務開發還是框架中間件都會用到,大部分都是基於JDK線程池ThreadPoolExecutor做的封裝,
都會牽涉到這幾個核心參數的設置:核心線程數,等待(任務)隊列,最大線程數,拒絕策略等。
但如果線程池設置不當就會引起一系列問題, 下面就說下我最近碰到的問題。
案件還原
比如你有一個項目中有個接口部分功能使用了線程池,這個功能會去調用多個第三方接口,都有一定的耗時,為了不影響主流程的性能,不增加整體響應時間,所以放在線程池裡和主線程並行執行,等線程池裡的任務執行完通過future.get的方式獲取線程池裡的線程執行結果,然後合併到主流程的結果里返回,大致流程如下:
線程池參數為:
- coresize:50
- max:200
- queuesize:1
- keepalivetime:60s
- 拒絕策略為reject
假設每次請求提交5個task到線程池,平均每個task是耗時50ms
沒過一會就收到了線程池滿了走了拒絕策略的報錯
結合你對線程池的了解,先思考下為什麼
線程池的工作流程如下:
根據這個我們來列一個時間線
1. 項目剛啟動 第1次請求(每次5個task提交到線程池),創建5個核心線程
2. 第2次請求 繼續創建5個(共10個核心線程了)
3. 直到第10次 核心線程數會達滿50個
4. 核心線程處理完之後核心線程會幹嘛呢
根據 jdk1.8的線程池的源碼:
線程池的線程處理處理了交給它的task之後,它會去getTask()
源碼如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//注意這段
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
請注意上面代碼中的bool類型的timed的賦值邏輯,
由於allowCoreThreadTimeOut默認為false,也就是說:
只要創建的線程數量超過了核心線程數,那麼幹完手上活後的線程(不管是核心線程,還是超過隊列後新開的線程)就會走進
//線程狀態為 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
由於我們上面步驟裏面還沒有超過coresize所以會走進
//線程狀態為 waiting
workQueue.take()
所以答案是:上面步驟幹活的核心線程處理完之後核心線程會進入waiting狀態,
只要隊列一有活就會被喚醒去幹活。
5. 到第11次的時候
好傢夥,到這步驟的時候 ,核心線程數已滿,那麼就往隊列裏面塞,但是設置的queuesize=1,
每次有5個task,那就是說往隊列裏面塞1個,剩下4個(別較真我懂你意思)要創建新的max線程了。
結果:
- 核心線程數:50
- 隊列:1
- max線程:4個
因為50個核心線程在waiting中,所以隊列只要一add,就會立馬被消費,假設消費的這個核心線程名字是小A。
這裡要細品一下:
這裡已經總線程數大於核心線程數了,那麼getTask()裏面
// timed=true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
那麼小A幹完活就會走進
//線程狀態為 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
此處核心線程小A就會變成timedwaiting的狀態(keepalive設置的是60s)
6. 到第12次的時候
繼續往隊列塞1個,創建4個max線程,max線程已經有8個了
這裡 又會有一個新的核心線程小B ,會變成timedwaiting狀態了
max線程們幹完手上的活後,也會去調用getTask() 也會進入timedwaiting狀態
因為queuesize=1,狼多肉少
7. 繼續下去,那麼最終會變成
max滿了,線程們都在timedwaiting(keepalive設置的是60s)
新的提交就會走拒絕策略了
問題總結
其實核心與非核心對於線程池來說都是一樣的,只要一旦線程數超過了核心線程數,那麼線程就會走進timewaiting
把queuesize調大就好了?
這裡又有一個新的注意點:
上面舉例的是I/O密集型業務,queuesize不是越大越好的,
因為:
線程池新創建的線程會優先處理新請求進來的任務,而不是去處理隊列里的任務,隊列里的任務只能等核心線程數忙完了才能被執行,這樣可能造成隊列里的任務長時間等待,導致隊列積壓,尤其是I/O密集場景
慎用CallRunnerPolicy這個拒絕策略
一定得理解這個策略會帶來什麼影響,
先看下這個拒絕策略的源碼
如果你提交線程池的任務即時失敗也沒有關係的話,用這個拒絕策略是致命的,
因為一旦超過線程池的負載後開始吞噬tomcat線程。
用future.get的方式慎用DiscardPolicy這個拒絕策略
如果需要得到線程池裡的線程執行結果,使用future的方式,拒絕策略不建議使用DiscardPolicy,這種丟棄策略雖然不執行子線程的任務,
但是還是會返回future對象(其實在這種情況下我們已經不需要線程池返回的結果了),然後後續代碼即使判斷了future!=null也沒用,
這樣的話還是會走到future.get()方法,如果get方法沒有設置超時時間會導致一直阻塞下去
類似下面的偽代碼:
// 如果線程池已滿,新的請求會被直接執行拒絕策略,此時如果拒絕策略設置的是DiscardPolicy丟棄任務,
// 則還是會返回future對象, 這樣的話後續流程還是可能會走到get獲取結果的邏輯
Future<String> future = executor.submit(() -> {
// 業務邏輯,比如調用第三方接口等操作
return result;
});
// 主流程調用邏輯
if(future != null) // 如果拒絕策略是DiscardPolicy還是會走到下面代碼
future.get(超時時間); // 調用方阻塞等待結果返回,直到超時
推薦解決方案
1. 用動態線程池,可以動態修改coresize,maxsize,queuesize,keepalivetime
- 對線程池的核心指標進行埋點監控,可以通過繼承 ThreadPoolExecutor 然後Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,進行埋點記錄到es
- 可以埋點的數據有:
包括線程池運行狀態、核心線程數、最大線程數、任務等待數、已完成任務數、線程池異常關閉等信息
名稱 | 含義 |
---|---|
core_pool_size | 定義的核心線程總數 |
max_pool_size | 定義的maxpoolsize |
keep_alive_time | 定義的keepalivetime |
current_pool_size | 當前線程池總線程數 |
queue_wait_size | 當前隊列中等待處理的個數 |
active_count | 當前run狀態的線程數 |
completed_count | 當前線程池中的每個線程處理的task數的疊加值 |
task_count | 等於completed_count加上queue_wait_size |
shutdown | 當前線程池的狀態是否關閉 |
useRate | 當前線程池利用率:((active_count * 1.0 / max_pool_size) * 100) |
基於以上數據,我們可以實時監控和排查定位問題
參考代碼:
/**
* 自定義線程池<p>
* 1.監控線程池狀態及異常關閉等情況<p>
* 2.監控線程池運行時的各項指標, 比如:任務執行時間、任務等待數、已完成任務數、任務異常信息、核心線程數、最大線程數等<p>
* author: maoyingxu
*/
public class ThreadPoolExt extends ThreadPoolExecutor{
private TimeUnit timeUnit;
public ThreadPoolExt(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeUnit = unit;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
monitor("ThreadPool monitor data:"); // 監控線程池運行時的各項指標
}
@Override
protected void afterExecute(Runnable r, Throwable ex) {
// 記錄線程池執行任務的時間
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:{0}", executeTime));
if (ex != null) { // 監控線程池中的線程執行是否異常
LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);
}
}
@Override
public void shutdown() {
monitor("ThreadPool will be shutdown:"); // 線程池將要關閉事件,此方法會等待線程池中正在執行的任務和隊列中等待的任務執行完畢再關閉
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
monitor("ThreadPool going to immediately be shutdown:"); // 線程池立即關閉事件,此方法會立即關閉線程池,但是會返回隊列中等待的任務
// 記錄被丟棄的任務, 目前只記錄日誌, 後續可根據業務場景做進一步處理
List<Runnable> dropTasks = null;
try {
dropTasks = super.shutdownNow();
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("{0}ThreadPool discard task count:{1}{2}",
System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));
} catch (Exception e) {
LogUtils.addClogException("ThreadPool shutdownNow error", e);
}
return dropTasks;
}
/**
* 監控線程池運行時的各項指標, 比如:任務等待數、任務異常信息、已完成任務數、核心線程數、最大線程數等
* @param title
*/
private void monitor(String title){
try {
// 線程池監控信息記錄, 這裡需要注意寫ES的時機,尤其是多個子線程的日誌合併到主流程的記錄方式
String threadPoolMonitor = MessageFormat.format(
"{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +
"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +
"thread name:{13}{14}",
System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),
this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),
this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);
LogUtils.info(title, threadPoolMonitor);
ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋點線程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100
Cat.logEvent(key, String.valueOf(useRate)); // 報警設置
} catch (Exception e) {
LogUtils.addClogException("ThreadPool monitor error", e);
}
}
}
2. 重寫線程池拒絕策略, 拒絕策略主要參考了 Dubbo的線程池拒絕策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
// 省略部分代碼
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg); // 記錄最大負載情況下線程池的核心線程數,活躍數,最大線程數等參數
dumpJStack(); // 記錄線程堆棧信息包括鎖爭用信息
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes 每隔10分鐘記錄一次
if (now - lastPrintTime < TEN_MINUTES_MILLS) {
return;
}
if (!guard.tryAcquire()) { // 加鎖訪問
return;
}
ExecutorService pool = Executors.newSingleThreadExecutor(); // 這裡單獨開啟一個新的線程去執行(阿里的Java開發規範不允許直接調用Executors.newSingleThreadExecutor, 估計dubbo那時候還沒出開發規範...)
pool.execute(() -> {
String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
// window system don't support ":" in file name
if (os.contains(OS_WIN_PREFIX)) {
sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
} else {
sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
}
String dateStr = sdf.format(new Date());
//try-with-resources
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
//must shutdown thread pool ,if not will lead to OOM
pool.shutdown();
}
}
以上理解如果有誤,歡迎大佬指正。
參考資料:
- Dubbo線程池拒絕策略: org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport.java
- 《Java並發編程實戰》