SpringBoot執行緒池

1、遇到的場景

  • 提高一下插入表的性能優化,兩張表,先插舊的表,緊接著插新的表,若是一萬多條數據就有點慢了

2、使用步驟

  • 用Spring提供的對ThreadPoolExecutor封裝的執行緒池ThreadPoolTaskExecutor,直接使用註解啟用

配置


@Configuration
@EnableAsync
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置核心執行緒數
        executor.setCorePoolSize(corePoolSize);
        // 配置最大執行緒數
        executor.setMaxPoolSize(maxPoolSize);
        // 配置隊列大小
        executor.setQueueCapacity(queueCapacity);
        // 配置執行緒池中的執行緒的名稱前綴
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

  • @Value取值配置是在application.properties中的

# 非同步執行緒配置
# 配置核心執行緒數
async.executor.thread.core_pool_size = 5
# 配置最大執行緒數
async.executor.thread.max_pool_size = 5
# 配置隊列大小
async.executor.thread.queue_capacity = 99999
# 配置執行緒池中的執行緒的名稱前綴
async.executor.thread.name.prefix = async-service-

Demo測試

  • Service介面

public interface AsyncService {

    /**
     * 執行非同步任務
     * 可以根據需求,自己加參數擬定
     */
    void executeAsync();
}

  • Service實現類

@Service
public class AsyncServiceImpl implements AsyncService {

    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("start executeAsync");

        System.out.println("非同步執行緒要做的事情");
        System.out.println("可以在這裡執行批量插入等耗時的事情");

        logger.info("end executeAsync");
    }
}

  • 在Controller層注入剛剛的Service即可

@Autowired
private AsyncService asyncService;

@GetMapping("/async")
public void async(){
    asyncService.executeAsync();
}

  • 使用測試工具測試即可看到相應的列印結果

3、摸索一下

-** 弄清楚執行緒池當時的情況,有多少執行緒在執行,多少在隊列中等待?**

  • 創建一個ThreadPoolTaskExecutor的子類,在每次提交執行緒的時候都將當前執行緒池的運行狀況列印出來

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {


    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

  • 進過測試發現:showThreadPoolInfo方法中將任務總數、已完成數、活躍執行緒數,隊列大小都列印出來了,然後Override了父類的execute、submit等方法,在裡面調用showThreadPoolInfo方法,這樣每次有任務被提交到執行緒池的時候,都會將當前執行緒池的基本情況列印到日誌中

  • 現在修改ExecutorConfig.javaasyncServiceExecutor方法,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()


@Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        // 在這裡進行修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        // 配置核心執行緒數
        executor.setCorePoolSize(corePoolSize);
        // 配置最大執行緒數
        executor.setMaxPoolSize(maxPoolSize);
        // 配置隊列大小
        executor.setQueueCapacity(queueCapacity);
        // 配置執行緒池中的執行緒的名稱前綴
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新執行緒中執行任務,而是有調用者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }

  • 經最後測試得到的結果:提交任務到執行緒池的時候,調用的是submit(Callable task)這個方法,當前已經提交了3個任務,完成了3個,當前有0個執行緒在處理任務,還剩0個任務在隊列中等待