線程池,我是誰?我在哪兒?
大家好,這篇文章跟大家探討下日常使用線程池的各種姿勢,重點介紹怎麼在 Spring 環境中正確使用線程池。
線程池使用姿勢
首先問大家一個問題,你日常開發中是怎樣使用線程池的?
我想大致可以分為以下四種情況:
1.方法級,隨用隨建,用完關閉
2.類級共享,定義個 static final 修飾的 ThreadPoolExecutor,該類及子類(看修飾符)所有對象、方法共享
3.業務共享,按業務類型定義多個 ThreadPoolExecutor,相同業務類型共享同一線程池對象
4.全局共享,服務所有地方共享同一全局線程池
一般來說,優先使用方式3,其次方式2,不要使用方式1跟4,原因如下
1.線程池出現的目的就是為了統一管理線程資源,減少頻繁創建銷毀線程帶來的開銷,使用池化技術復用線程執行任務,提升系統性能,在高並發、異步化的場景下,方法級使用根本達不到此目的,反而會使性能變低。
2.全局共享一個線程池,任務執行參差不齊,相互影響,高耗時任務會佔滿線程池資源,導致低耗時任務沒機會執行;同時如果任務之間存在父子關係,可能會導致死鎖的發生,進而引發OOM。
3.按業務類型進行線程池隔離,各任務執行互不影響,粒度也比類級共享大點,不會創建大量線程池,降低系統調度壓力,像 Hystrix 線程池隔離就可以理解成這種模式。
綜上,建議大家都採用方式3,按業務功能分類定義線程池。
Spring 項目中使用 ThreadPoolExecutor
Spring 作為一個 Bean 容器,我們通常會將業務中用到的 ThreadPoolExecutor 註冊到 Spring 容器中,同時 Spring 在容器刷新的時候會注入相應的 ThreadPoolExecutor 對象 到我們的業務 Bean 中,然後就可以直接使用了,比如定義如下(ThreadPoolBuilder是封裝的一個建造者模式實現):
@Configuration
public class ThreadPoolConfiguration {
@Bean
public ThreadPoolExecutor jobExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000)
.build();
}
@Bean
public ThreadPoolExecutor remotingExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(SYNCHRONOUS_QUEUE.getName(), null)
.build();
}
@Bean
public ThreadPoolExecutor consumeExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000)
.build();
}
}
以上按使用場景定義了三個線程池實例,一個用來執行耗時的定時任務、一個用來執行遠程RPC調用、一個用來執行 Mq 消費。
這樣使用 ThreadPoolExecutor 有個問題,Spring 容器關閉的時候可能任務隊列里的任務還沒處理完,有丟失任務的風險。
我們知道 Spring 中的 Bean 是有生命周期的,如果 Bean 實現了 Spring 相應的生命周期接口(InitializingBean、DisposableBean接口),在 Bean 初始化、容器關閉的時候會調用相應的方法來做相應處理。
所以建議最好不要直接使用 ThreadPoolExecutor 在 Spring 環境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 線程池實現。
一些 Spring 知識
這裡分享一個源碼閱讀技巧,就是開源項目和Spring整合時,很多同學不知從何入手閱讀源碼。
我們知道Spring提供了很多的擴展點,第三方框架整合Spring其實大多也都是基於這些擴展接口來做的,所以我們可以從這些擴展接口入手,斷點調試,一步步深入框架內核。
這些擴展包括但不限於以下接口:
BeanFactoryPostProcessor:在Bean實例化之前對BeanDefinition進行修改
BeanPostProcessor:在Bean初始化前後對Bean進行一些修改包裝增強,比如返回代理對象
Aware:一個標記接口,實現該接口及子接口的類會收到Spring的通知回調,賦予某種Spring框架的能力,比如ApplicationContextAware、EnvironmentAware等
ApplicationContextInitializer:在上下文準備階段,容器刷新之前做一些初始化工作,比如我們常用的配置中心client基本都是繼承該初始化器,在容器刷新前將配置從遠程拉到本地,然後封裝成PropertySource放到Environment中供使用
ApplicationListener:Spring事件機制,監聽特定的應用事件(ApplicationEvent),觀察者模式的一種實現
FactoryBean:用來自定義Bean的創建邏輯(Mybatis、Feign等等)
ImportBeanDefinitionRegistrar:定義@EnableXXX註解,在註解上Import了一個 ImportBeanDefinitionRegistrar,實現註冊BeanDefinition到容器中
ApplicationRunner/CommandLineRunner:容器啟動後回調,執行一些初始化工作
上述列出了幾個比較常用的接口,但是Spring擴展遠不於此,還有很多擴展接口大家可以自己去了解。
DynamicTp 生成線程池對象
DynamicTp 框架內部定義了 DtpExecutor 線程池類,其繼承關係如下:
EagerDtpExecutor:參考 Tomcat 線程池設計,調整了下線程池的執行流程,優先創建線程執行任務而不是放入隊列中,主要用於IO密集型場景,繼承 DtpExecutor
DtpExecutor:重寫了 ThreadPoolExecutor 的 execute 方法、beforeExecute 方法、afterExecute 方法,主要做任務包裝、執行超時、等待超時記錄等,繼承 DtpLifecycleSupport
DtpLifecycleSupport:實現了 Spring 中的 InitializingBean, DisposableBean 接口,在 Bean 初始化、Spring 容器銷毀時執行相應的邏輯,destroy 方法邏輯如下:
@Override
public void destroy() {
internalShutdown();
}
public void internalShutdown() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);
}
if (this.waitForTasksToCompleteOnShutdown) {
// 如果需要等待任務執行完畢,則調用shutdown()會執行先前已提交的任務,拒絕新任務提交,線程池狀態變成 SHUTDOWN
this.shutdown();
} else {
// 如果不需要等待任務執行完畢,則直接調用shutdownNow()方法,嘗試中斷正在執行的任務,返回所有未執行的任務,線程池狀態變成 STOP, 然後調用 Future 的 cancel 方法取消
for (Runnable remainingTask : this.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary();
}
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds <= 0) {
return;
}
try {
// 配合 shutdown 使用,阻塞當前線程,等待已提交的任務執行完畢或者超時
if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor {} to terminate", threadPoolName);
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName);
}
Thread.currentThread().interrupt();
}
}
DynamicTp 框架在整合 Spring 的時候,也是用到了上述說的擴展接口。
擴展1
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DtpBeanDefinitionRegistrar.class)
public @interface EnableDynamicTp {
}
使用過 DynamicTp 的小夥伴應該知道需要在啟動類加 @EnableDynamicTp 註解,該註解其實就用到了 ImportBeanDefinitionRegistrar 擴展,主要代碼如下:
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
DtpProperties dtpProperties = new DtpProperties();
// 將環境變量中的線程池相關配置綁定到 DtpProperties 對象上
PropertiesBinder.bindDtpProperties(environment, dtpProperties);
val executors = dtpProperties.getExecutors();
if (CollUtil.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
return;
}
executors.forEach(x -> {
// 判斷線程池類型(common or eager)
Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
String beanName = x.getThreadPoolName();
// 線程池對象屬性
Map<String, Object> properties = buildProperties(x);
// 構造器參數
Object[] args = buildArgs(executorTypeClass, x);
BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args);
});
}
代碼解讀:
1.我們知道 ImportBeanDefinitionRegistrar 的實現是在 Spring 容器刷新的時候執行的,在此之前在上下文準備階段已經從配置中心拉取到線程池配置放到環境變量里了,所以第一步我們將環境變量里的線程池相關配置綁定到 DtpProperties 對象上。
2.然後構造 BeanDefinitionBuilder 對象,設置構造函數參數、設置屬性值,註冊到 BeanDefinition 到 Spring 容器中
public static void doRegister(BeanDefinitionRegistry registry,
String beanName,
Class<?> clazz,
Map<String, Object> properties,
Object... constructorArgs) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
// 設置構造器參數,老八股文了
for (Object constructorArg : constructorArgs) {
builder.addConstructorArgValue(constructorArg);
}
// 設置屬性及值的KV對,後續在Bean populateBean 的時候會通過反射set方法賦值
if (CollUtil.isNotEmpty(properties)) {
properties.forEach(builder::addPropertyValue);
}
registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
3.Spring 容器刷新時會根據註冊的 BeanDefinition 創建配置的線程池對象,初始化賦值,並注入到引用的 Bean 中。這樣就不用在手動用 @Bean 聲明線程池對象了,只需要在配置中心配置即可
擴展2
DtpPostProcessor 繼承 BeanPostProcessor,在 Bean 初始化前後對 ThreadPoolExecutor 及其子類進行一些處理,主要用來獲取線程池對象註冊到 DynamicTp 框架內部定義的容器中(就個 Map)
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor)) {
return bean;
}
if (bean instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
}
registerDtp(dtpExecutor);
return dtpExecutor;
}
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
DynamicTp dynamicTp;
try {
dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
if (dynamicTp == null) {
return bean;
}
} catch (NoSuchBeanDefinitionException e) {
log.error("There is no bean with the given name {}", beanName, e);
return bean;
}
String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;
registerCommon(poolName, (ThreadPoolExecutor) bean);
return bean;
}
擴展3
ApplicationListener 主要用來解耦邏輯,發佈監聽事件,core 模塊跟 adapter 模塊通信主要就用該擴展,以及框架會監聽 Spring 容器啟動的各階段事件,做相應的邏輯處理
public abstract class AbstractDtpHandleListener implements GenericApplicationListener {
@Override
public boolean supportsEventType(ResolvableType resolvableType) {
Class<?> type = resolvableType.getRawClass();
if (type != null) {
return RefreshEvent.class.isAssignableFrom(type) ||
CollectEvent.class.isAssignableFrom(type) ||
AlarmCheckEvent.class.isAssignableFrom(type);
}
return false;
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
}
擴展4
ApplicationRunner,等 Spring 容器啟動後,會調用該鉤子函數,執行一些初始化操作,DtpMonitor、DtpRegistry 等都用到了該擴展
所以 DynamicTp 的正確使用姿勢,線程池只需在配置中心聲明,然後服務啟動時框架會基於 Spring 的這些擴展自動創建線程池對象注入到所需的 Bean 中,代碼中不需要顯示聲明
再次介紹下 DynamicTp 框架
DynamicTp 是一個基於配置中心實現的輕量級動態線程池管理工具,主要功能可以總結為 動態調參、通知報警、運行監控、三方包線程池管理等幾大類。
經過幾個版本迭代,目前最新版本v1.0.7具有以下特性
特性 ✅
-
代碼零侵入:所有配置都放在配置中心,對業務代碼零侵入
-
輕量簡單:基於 springboot 實現,引入 starter,接入只需簡單4步就可完成,順利3分鐘搞定
-
高可擴展:框架核心功能都提供 SPI 接口供用戶自定義個性化實現(配置中心、配置文件解析、通知告警、監控數據採集、任務包裝等等)
-
線上大規模應用:參考美團線程池實踐,美團內部已經有該理論成熟的應用經驗
-
多平台通知報警:提供多種報警維度(配置變更通知、活性報警、容量閾值報警、拒絕觸發報警、任務執行或等待超時報警),已支持企業微信、釘釘、飛書報警,同時提供 SPI 接口可自定義擴展實現
-
監控:定時採集線程池指標數據,支持通過 MicroMeter、JsonLog 日誌輸出、Endpoint 三種方式,可通過 SPI 接口自定義擴展實現
-
任務增強:提供任務包裝功能,實現TaskWrapper接口即可,如 TtlTaskWrapper 可以支持線程池上下文信息傳遞,以及給任務設置標識id,方便問題追蹤
-
兼容性:JUC 普通線程池也可以被框架監控,@Bean 定義時加 @DynamicTp 註解即可
-
可靠性:框架提供的線程池實現 Spring 生命周期方法,可以在 Spring 容器關閉前儘可能多的處理隊列中的任務
-
多模式:參考Tomcat線程池提供了 IO 密集型場景使用的 EagerDtpExecutor 線程池
-
支持多配置中心:基於主流配置中心實現線程池參數動態調整,實時生效,已支持 Nacos、Apollo、Zookeeper、Consul,同時也提供 SPI 接口可自定義擴展實現
-
中間件線程池管理:集成管理常用第三方組件的線程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等組件的線程池管理(調參、監控報警)
項目地址
目前累計 1.5k star,感謝你的star,歡迎pr,業務之餘一起給開源貢獻一份力量
gitee地址://gitee.com/dromara/dynamic-tp
github地址://github.com/dromara/dynamic-tp