springBoot服务整合线程池ThreadPoolTaskExecutor与@Async详解使用
- 2021 年 10 月 27 日
- 筆記
- springboot
ThreadPoolExecutor:=======这个是java自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建。
ThreadPoolTaskExecutor:========这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类,包装类。
Spring默认的@Async用线程池名字为SimpleAsyncTaskExecutor。
Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor,没有配置的情况下,默认使用的是simpleAsyncTaskExecutor。
注意:
在springboot当中,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor线程池到bean当中,我们调用只需要
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
第一步: 首先在application启动类添加@EnableAsync
@SpringBootApplication @EnableAsync //首先在application启动类添加@EnableAsync public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }
第二步:配置线程池,不配置的话使用springboot默认的线程池。
package com.aswatson.csc.task.conf; import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class AsyncThreadConfiguration { @Bean("kafkaThreadExecutor") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10);//核心线程数 executor.setMaxPoolSize(20);//最大线程数 executor.setKeepAliveSeconds(60);//空闲线程存活时间 executor.setThreadNamePrefix("kafkaThreadAsync-"); executor.initialize(); return executor; } }
第三步:测试1:在需要异步执行的方法上加上@Async注解。
@Service public class AsyncTest { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public void hello(String name){ customerEventLogMapper.insert(customerEventLog); logger.info("异步线程启动 started."+name); } }
第四步:测试2:使用注入的模式:
package com.example.apidemo.completableFutrue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @Service public class AsyncService { @Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; public void addEventLog(String buId, String status){ CustomerEventLogPO customerEventLog = new CustomerEventLogPO(); customerEventLog.setUuid(uuid); customerEventLog.setStatus(status); customerEventLog.setCreated(LocalDateTime.now()); customerEventLogMapper.insert(customerEventLog); threadPoolTaskExecutor.submit(new Thread(()->{ customerEventLogMapper.insert(customerEventLog); })); //submit有返回值 threadPoolTaskExecutor.execute(new Thread(()->{ customerEventLogMapper.insert(customerEventLog); })); //execute无返回值 } }
注意: 如果配置多个线程池,该如何指定线程池呢?
方式1: @Resources(“kafkaThreadExecutor”)。
方式2: 如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池。
======================================================================================================================================================================
另外需要注意的是:关于注解失效需要注意以下几点:
- 注解的方法必须是public方法
- 方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,因为@Transactional和@Async注解的实现都是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
- 异步方法使用注解@Async的返回值只能为void或者Future
==================================================================================================================================================================================================
// @Bean()的拒绝策略:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
拒绝策略:如果(总任务数 – 核心线程数 – 任务队列数)-(最大线程数 – 核心线程数)> 0 的话,则会出现线程拒绝。举例:( 12 – 5 – 2 ) – ( 8 – 5 ) > 0,会出现线程拒绝。线程拒绝又分为 4 种策略,分别为:
-
- CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
- AbortPolicy():直接抛出异常。
- DiscardPolicy():直接丢弃。
- DiscardOldestPolicy():丢弃队列中最老的任务。