java線程池拒絕策略使用實踐

前言
線程池是開發過程中使用頻率較高的一個並發組件之一,本篇會結合踩刀哥之前的實踐經驗來分享一下線程池拒絕策略的真實使用場景,至於線程池內部原理只會簡單介紹,有需要的可以自行上網學習。
線程池工作機制
這裡用一個例子來描述下線程池的工作機制,2015年公司boss創立公司,創立初期公司業務比較少,boss一個人(corePoolSize=1)乾的有條不紊,沒過多久,業務量上來了,他一個人干不過來,分身乏力,那怎麼辦呢?其實很簡單,排隊唄,就這樣boss將待辦的任務都添加到需求池(BlockingQueue)裏面,boss又開始愉快的工作,但是客戶的耐心終歸有限,過了幾天發現自己交給我們公司的業務還沒完成,客戶一氣之下打電話給boss「我的活你幹完沒有,沒幹的話就停下來(shutdown/shutdownNow)吧,我找別人了」,這時候boss慌了,流着淚點上一根煙,在網上發了招聘,就這樣幹活的人又多了起來(addWorker),但是員工終歸不是無限的,當活太多的時候,boss還是會拒絕接一些活(RejectedExecutionHandler)。公司在boss的帶領下沉浮五載,本以為2020年可以大幹一場,卻偏偏趕上了新冠,復工日期一拖再拖,客戶需求一少再少,唯獨公司養的員工沒少,這是公司目前最大的開支了。長痛不如短痛,boss們研究了一個政策,如果員工一個月(keepAliveTime=一個月)沒有活干,那麼就會被辭退(空閑線程被清理),一段時間以後不少員工被辭退了,只剩下核心人員。
畫個簡圖幫助理解,如下:
主角登場
之前的鋪墊都是為了引出RejectedExecutionHandler,現在我們來聊聊RejectedExecutionHandler的真實使用場景,先看看RejectedExecutionHandler的定義。
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
很顯然它是一個接口,只有一個方法rejectedExecution,當線程池拒絕接受任務的時候會調用它,RejectedExecutionHandler一般由構造ThreadPoolExecutor對象的時候傳入,如果沒有傳入會默認使用AbortPolicy。
jdk目前已提供四種RejectedExecutionHandler的實現供開發者使用,大多數情況下已夠用,少數情況下用戶可以選擇自定義,jdk提供的四種RejectedExecutionHandler實現如下:
1.AbortPolicy:中止策略,拋出RejectedExecutionException異常由使用者處理;
2.CallerRunsPolicy:佔用調用者的線程來執行被拒絕的任務;
3.DiscardOldestPolicy:將最早入隊列的任務丟棄,然後重新提交被拒絕的任務(這裡有可能依然不成功);
4.DiscardPolicy:拋棄策略,簡單的拋棄,和AbortPolicy比較相似,區別是前者對於用戶無感知;
實踐場景之-AbortPolicy
在踩刀哥過往的工作中有這麼一個需求,用戶支付以後給用戶push消息,這裡就用到了線程池來處理這塊業務,偽代碼如下:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(...);
try{
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
//1. 調用推送服務push msg
}
});
}catch (Exception RejectedExecutionException){
//2. 記錄日誌
}
前面說過,如果構造ThreadPoolExecutor時沒有傳遞RejectedExecutionHandler,jdk默認會使用AbortPolicy,它內部會拋出RejectedExecutionException,所以調用者需要捕獲這個異常做相應的處理,因為當時1.0的需求比較簡單,所以只是簡單了記錄了日誌,後來產品提出對於這種失敗的情況需要做補償,進而引出下面的第二個使用場景。
實踐場景之-自定義RejectedExecutionHandler
前面提到產品希望對於這種被拒絕的push任務需要做補償,具體的補償邏輯為:如果當時被拒絕了,那就每隔2s重試一次,一共重試2次。我當時的處理措施是,如果execute失敗了那就將任務放到redis中,異步取出重試,代碼怎麼寫呢,第一版是這麼寫的:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(...);
try{
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
//1. 調用推送服務push msg
}
});
}catch (Exception RejectedExecutionException){
//2. 將任務添加到redis中
}
//3 定時任務掃描redis,然後添加到threadPoolExecutor中
看着確實也沒有問題,也能實現功能,但是這種寫法顯得不太優雅,ThreadPoolExecutor對於拒絕處理這塊採用了策略設計模式來優化代碼,讓邏輯更清晰,而我現在的寫法將任務處理和拒絕處理揉在了一起,違背了原來的設計,所以決定進行改造,改造後如下:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( ..., new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
//1. 記錄日誌
log.warn...
//2 將task插入的redis中
redis.lpush...
}
});
拒絕處理的邏輯被封裝到自定義的拒絕處理器當中,邏輯更清晰,表達能力更強。
實踐場景之-CallerRunsPolicy
之前做過一個http推送平台,大體工作流程如下:
1.生產者將推送任務插入數據庫中;
2.推送平台起一個異步線程去獲取待推送任務;
3.將第2步中得到的推送任務丟到線程池裏面去推送。
簡單來說就是一個生產者消費者模型,推送的時候發現某些下游的接口響應時間較長,經常將線程池佔滿,所以就希望DelayQueuePollingTask這個線程能感知到這一情況,當線程池滿的時候停止去數據庫獲取待推送任務,所以就將RejectedExecutionHandler設置為CallerRunsPolicy,現在可以達到如下效果:
1.生產者將推送任務插入數據庫中;
2.推送平台起一個異步線程去獲取待推送任務;
3.將第2步中得到的推送任務丟到線程池裏面去推送;
4.線程池如果滿就由DelayQueuePollingTask這個Thread自己執行推送任務,這樣就可以停止去數據庫獲取待推送任務,DelayQueuePollingTask也不至於閑着沒事,還可以分擔任務。
