InheritableThreadLocal 在執行緒池中進行父子執行緒間消息傳遞出現消息丟失的解析

  • 2022 年 6 月 29 日
  • 筆記

在日常研發過程中,我們經常面臨著需要在執行緒內,執行緒間進行消息傳遞,比如在修改一些開源組件源碼的過程中,需要將外部參數透傳到內部,如果進行方法參數重載,則涉及到的改動量過大,這樣,我們可以依賴ThreadLocal 來進行消息傳遞。

ThreadLocal 是 存儲在執行緒棧幀中的一塊數據存儲區域,其可以做到執行緒與執行緒之間的讀寫隔離。

但是在我們的日常場景中,經常會出現 父執行緒 需要向子執行緒中傳遞消息,而 ThreadLocal  僅能在當前執行緒上進行數據快取,因此 我們需要使用 InheritableThreadLocal  來實現 父子執行緒間的消息傳遞

// 定義消息
public class ThreadLocalMessage { private final InheritableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new InheritableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 獲取執行緒中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; private Map<String, Object> others; private int retCode; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + ", others=" + others + ", retCode=" + retCode + '}'; } } }

  

// 定義執行緒池
@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心執行緒數 executor.setCorePoolSize(corePool); //配置最大執行緒數 executor.setMaxPoolSize(maxPool); //配置隊列大小 executor.setQueueCapacity(queue); //配置執行緒池中的執行緒的名稱前綴 executor.setThreadNamePrefix("async-executor-"); // 設置拒絕策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor;
// 使用TTL 初始化 executor //return TtlExecutors.getTtlExecutor(executor); } }
// 創建子執行緒進行消息傳遞並列印
public String test() throws Exception{ for (int i = 0 ; i < 20; i++){ ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg(); msg.setTaskId("task_id_"+i); ThreadLocalMessage.getInstance().setMsg(msg); myService.testThread(i); ThreadLocalMessage.getInstance().clear(); } return "ok"; }

  

經過程式碼測試,我們創建了一個池子大小為10 的執行緒,並發啟動了20個執行緒去進行父子執行緒消息傳遞,結果如下:

 

 

 

經過測試,我們發現 只有10個執行緒 的消息傳遞成功了,其餘10個執行緒的消息均丟失了,這是什麼原因呢。。。

遇到這個問題,我們首先得弄清楚 InheritableThreadLocal 是如何在父子執行緒間進行消息傳遞的

InheritableThreadLocal 在父執行緒創建子執行緒的時候,會將父執行緒中InheritableThreadLocal  中存儲的數據 拷貝一份 存儲到子執行緒的 InheritableThreadLocal  中

而我們使用的 執行緒池,執行緒池是會反覆利用執行緒的,當執行緒池沒有被創建滿,每次都是新創建執行緒,直到執行緒池創建滿了,再需要使用執行緒就會從執行緒池中拿已經創建好的執行緒。

問題就出在這裡,由於後面的執行緒 是從執行緒池中去撈已經創建好的執行緒,不會走創建邏輯,也就無法觸發 InheritableThreadLocal 中向子執行緒 拷貝,這也就是為什麼  InheritableThreadLocal  合併執行緒池 使用時,出現了 消息丟失的原因

 

如何解決????

阿里巴巴開源的TTL ,用於解決執行緒池中的父子執行緒復用,執行緒數據傳遞,可以完美解決這個問題

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.0.0</version>
        </dependency>

  

@EnableAsync
@Configuration
public class ExecutorConfig {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Value("${executor.corePool:2}")
    private Integer corePool;
    @Value("${executor.maxPool:10}")
    private Integer maxPool;
    @Value("${executor.queue:2}")
    private Integer queue;


    @Bean("cdl-executor")
    public Executor executor() {
        log.info("start async Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心執行緒數
        executor.setCorePoolSize(corePool);
        //配置最大執行緒數
        executor.setMaxPoolSize(maxPool);
        //配置隊列大小
        executor.setQueueCapacity(queue);
        //配置執行緒池中的執行緒的名稱前綴
        executor.setThreadNamePrefix("async-executor-");

        // 設置拒絕策略
        executor.setRejectedExecutionHandler((r, e) -> {
            // .....
        });

        // CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        // 使用TTL 的 executor
        return TtlExecutors.getTtlExecutor(executor);
        //return executor;
    }
}

  

public class ThreadLocalMessage {

    private final TransmittableThreadLocal<Msg> msg;

    private ThreadLocalMessage() {
        msg = new TransmittableThreadLocal<>();
    }

    public Msg getMsg() {
        return this.msg.get();
    }

    public void setMsg(Msg msg) {
        this.msg.set(msg);
    }

    public void clear() {
        msg.remove();
    }

    private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();

    public static ThreadLocalMessage getInstance() {
        return threadLocalMessage;
    }

    /**
     * 獲取執行緒中的消息
     *
     * @return
     */
    public static Msg getOrCreateMsg() {
        Msg msg = ThreadLocalMessage.getInstance().getMsg();
        if (msg == null) {
            msg = new Msg();
        }
        return msg;
    }

    public static class Msg {

        /**
         * taskId
         */
        private String taskId;


        public Msg() {
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        @Override
        public String toString() {
            return "Msg{" +
                    "taskId='" + taskId + '\'' +
                    '}';
        }
    }

}

  

按照之前的調用方法再試一次,結果如下:

 

 可以發現未出現數據丟失的情況