InheritableThreadLocal 在執行緒池中進行父子執行緒間消息傳遞出現消息丟失的解析
- 2022 年 6 月 29 日
- 筆記
在日常研發過程中,我們經常面臨著需要在執行緒內,執行緒間進行消息傳遞,比如在修改一些開源組件源碼的過程中,需要將外部參數透傳到內部,如果進行方法參數重載,則涉及到的改動量過大,這樣,我們可以依賴ThreadLocal 來進行消息傳遞。
ThreadLocal 是 存儲在執行緒棧幀中的一塊數據存儲區域,其可以做到執行緒與執行緒之間的讀寫隔離。
但是在我們的日常場景中,經常會出現 父執行緒 需要向子執行緒中傳遞消息,而 ThreadLocal 僅能在當前執行緒上進行數據快取,因此 我們需要使用 InheritableThreadLocal 來實現 父子執行緒間的消息傳遞
// 定義消息
public class ThreadLocalMessage { private final InheritableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new InheritableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 獲取執行緒中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; private Map<String, Object> others; private int retCode; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + ", others=" + others + ", retCode=" + retCode + '}'; } } }
// 定義執行緒池
@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心執行緒數 executor.setCorePoolSize(corePool); //配置最大執行緒數 executor.setMaxPoolSize(maxPool); //配置隊列大小 executor.setQueueCapacity(queue); //配置執行緒池中的執行緒的名稱前綴 executor.setThreadNamePrefix("async-executor-"); // 設置拒絕策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor;
// 使用TTL 初始化 executor //return TtlExecutors.getTtlExecutor(executor); } }
// 創建子執行緒進行消息傳遞並列印
public String test() throws Exception{ for (int i = 0 ; i < 20; i++){ ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg(); msg.setTaskId("task_id_"+i); ThreadLocalMessage.getInstance().setMsg(msg); myService.testThread(i); ThreadLocalMessage.getInstance().clear(); } return "ok"; }
經過程式碼測試,我們創建了一個池子大小為10 的執行緒,並發啟動了20個執行緒去進行父子執行緒消息傳遞,結果如下:
經過測試,我們發現 只有10個執行緒 的消息傳遞成功了,其餘10個執行緒的消息均丟失了,這是什麼原因呢。。。
遇到這個問題,我們首先得弄清楚 InheritableThreadLocal 是如何在父子執行緒間進行消息傳遞的
InheritableThreadLocal 在父執行緒創建子執行緒的時候,會將父執行緒中InheritableThreadLocal 中存儲的數據 拷貝一份 存儲到子執行緒的 InheritableThreadLocal 中
而我們使用的 執行緒池,執行緒池是會反覆利用執行緒的,當執行緒池沒有被創建滿,每次都是新創建執行緒,直到執行緒池創建滿了,再需要使用執行緒就會從執行緒池中拿已經創建好的執行緒。
問題就出在這裡,由於後面的執行緒 是從執行緒池中去撈已經創建好的執行緒,不會走創建邏輯,也就無法觸發 InheritableThreadLocal 中向子執行緒 拷貝,這也就是為什麼 InheritableThreadLocal 合併執行緒池 使用時,出現了 消息丟失的原因
如何解決????
阿里巴巴開源的TTL ,用於解決執行緒池中的父子執行緒復用,執行緒數據傳遞,可以完美解決這個問題
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.0.0</version> </dependency>
@EnableAsync
@Configuration
public class ExecutorConfig {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${executor.corePool:2}")
private Integer corePool;
@Value("${executor.maxPool:10}")
private Integer maxPool;
@Value("${executor.queue:2}")
private Integer queue;
@Bean("cdl-executor")
public Executor executor() {
log.info("start async Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心執行緒數
executor.setCorePoolSize(corePool);
//配置最大執行緒數
executor.setMaxPoolSize(maxPool);
//配置隊列大小
executor.setQueueCapacity(queue);
//配置執行緒池中的執行緒的名稱前綴
executor.setThreadNamePrefix("async-executor-");
// 設置拒絕策略
executor.setRejectedExecutionHandler((r, e) -> {
// .....
});
// CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
// 使用TTL 的 executor
return TtlExecutors.getTtlExecutor(executor);
//return executor;
}
}
public class ThreadLocalMessage { private final TransmittableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new TransmittableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 獲取執行緒中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + '}'; } } }
按照之前的調用方法再試一次,結果如下:
可以發現未出現數據丟失的情況