Spring Boot启用异步线程
- 2020 年 4 月 3 日
- 筆記
一般的后台管理系统都有导出报表的功能,对于大数据量的报表导出,通常比较耗时,比如管理员点击一个导出按钮,往往要等待很长的时间直到报表成功导出才可以进行下一步操作,显然这种同步的方式已经满足不了需求了。现在实际开发中常用的方式是采用
JMS
消息队列方式,发送消息到其他的系统中进行导出,或者是在项目中开启异步线程来完成耗时的导出工作。本文将结合报表导出的场景,来讲解一些Spring Boot
中如何开启异步线程。
定义线程池和开启异步可用
Spring
中存在一个接口AsyncConfigurer
接口,该接口就是用来配置异步线程池的接口,它有两个方法,getAsyncExecutor
和getAsyncUncaughtExceptionHandler
,第一个方法是获取一个线程池,第二个方法是用来处理异步线程中发生的异常。它的源码如下所示:
package org.springframework.scheduling.annotation; import java.util.concurrent.Executor; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.lang.Nullable; public interface AsyncConfigurer { // 获取线程池 @Nullable default Executor getAsyncExecutor() { return null; } // 异步异常处理器 @Nullable default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
这里的接口提供的都是空实现,所以想要开启异步线程机制,那么就需要我们手动实现这个接口,将实现该接口的类标注为Spring
的配置类,那么就开启了Spring
的异步可用,那么Spring
就会通过getAsyncExecutor
来获取一个可用的线程来执行某项异步操作,当然,整个异步的开启还需要结合两个注解,一个是@EnableAsync
,另外一个是@Async
,第一个是标注在配置类中,用来告诉Spring
异步可用,第二个注解通常标注在某个方法中,当调用这个方法的时候,就会从线程池中获取新的线程来执行它。 现在我们来定义线程池并开启异步可用,这里写一个配置类AsyncConfig
来实现AsyncConfigurer
,代码如下所示:
package cn.itlemon.springboot.async.config; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; /** * @author jiangpingping * @date 2018/10/30 19:28 */ @Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { // 自定义线程池 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // 核心线程数 taskExecutor.setCorePoolSize(10); // 最大线程数 taskExecutor.setMaxPoolSize(30); // 线程队列最大线程数 taskExecutor.setQueueCapacity(2000); // 初始化线程池 taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { log.error("Error Occurs in async method:{}", ex.getMessage()); }; } }
第一个方法我们定义了一个线程池,并设置了一些基本参数,比如核心线程数、最大线程数、线程队列最大线程数等,第二个方法是处理异步线程中发生的异常,它是一个异常处理器,返回AsyncUncaughtExceptionHandler
接口的实现类对象,由于AsyncUncaughtExceptionHandler
是一个函数式接口(只有一个抽象方法的接口,通常使用@FunctionalInterface
注解标注的接口),所以这里使用了Lambda
表达式来简写它的实现类对象,这里的异步异常处理就是记录一下日志,并没有做其他的逻辑操作,如果对Lambda表达式不熟悉,也可以直接使用匿名内部类的方式来创建AsyncUncaughtExceptionHandler
的实现类对象,如下所示:
@Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("Error Occurs in async method:{}", ex.getMessage()); } }; }
需要注意的一点的是,我们在上面的配置类中加入了@EnableAsync
注解,那么在Spring
注册该配置类为Spring Bean
的时候,就会开启异步可用机制。
测试异步可用机制
写一个Service层接口,用来表明生成报表:
package cn.itlemon.springboot.async.service; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:32 */ public interface AsyncService { /** * 模拟生成报表的异步方法 */ void generateReport(); }
它的实现类是:
package cn.itlemon.springboot.async.service.impl; import cn.itlemon.springboot.async.service.AsyncService; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:33 */ @Service public class AsyncServiceImpl implements AsyncService { @Override @Async public void generateReport() { // 模拟异步生成报表代码,这里设置为打印 System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】"); } }
这里假设进行了报表的导出工作,所以使用打印语句来进行简单的模拟,并在方法中标注了@Async
注解,那么当调用该方法的时候,Spring
会获取一个新的线程来执行这个方法,所以这里打印出执行当前方法的线程名称。我们在写一个控制器,代码如下:
package cn.itlemon.springboot.async.controller; import cn.itlemon.springboot.async.service.AsyncService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:36 */ @RestController @RequestMapping("/async") @Slf4j public class AsyncController { private final AsyncService asyncService; @Autowired public AsyncController(AsyncService asyncService) { this.asyncService = asyncService; } @GetMapping("/page") public String asyncPage() { System.out.println("当前请求线程名称为:【" + Thread.currentThread().getName() + "】"); // 异步调用 asyncService.generateReport(); // 返回结果 return "async"; } }
我们在当前Controller
方法中也打印了当前的线程,运行项目,访问指定的URL
,就可以对比在调用generateReport
方法的时候是否启用了新的线程。我们启动Spring Boot
应用,在浏览器地址栏输入:http://localhost:8080/async/page
,在控制台打印的结果是:
当前请求线程名称为:【http-nio-8080-exec-1】 报表线程名称:【ThreadPoolTaskExecutor-1】
很明显,这不是同一个线程,说明我们开启异步线程成功。
处理异步线程中的异常
一般在Spring
中处理异步线程异常分成两类,一类是异步方法没有返回值,另一类是异步方法有返回值。
第一类无返回值方法
对于第一类无返回值情况,我们已经在AsyncConfig
配置类中进行了配置,即实现getAsyncUncaughtExceptionHandler
方法,也就是当异步线程中的代码发生了异常,就会调用这个方法来进行异常处理,为了检验,我们在AsyncServiceImpl
的方法generateReport
中手动加一行代码System.out.println(1 / 0);
,从而导致其出除零异常,代码如下所示:
@Override @Async public void generateReport() { // 模拟异步生成报表代码,这里设置为打印 System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】"); System.out.println(1 / 0); }
当再次启动Spring Boot
应用,在浏览器地址栏输入:http://localhost:8080/async/page
,那么将在异步流程中发生异常,由于是在不同线程中发生的异常,所以它并不会影响主线程的执行,且发生异常后,由配置了getAsyncUncaughtExceptionHandler
方法,那么该异常将会被处理,处理的方式就是使用日志进行了记录:
2018-10-31 10:57:09.952 ERROR 2391 --- [lTaskExecutor-1] c.i.springboot.async.config.AsyncConfig : Error Occurs in async method:/ by zero
第二类有返回值方法
对于第二种情况,即异步方法会有返回值,那么我们如何获取到异步线程处理后的返回值呢,通常的方法是将异步方法的返回值使用接口Future
、ListenableFuture
或者类AsyncResult
进行包装,即将返回值作为泛型传入到上述接口或者类中。这里我们来简要分析一下它们的源码中的常用方法。
Future
接口:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
方法分析:
cancel
方法用来取消任务,如果取消任务成功则返回true
,如果取消任务失败则返回false
。参数mayInterruptIfRunning
表示是否允许取消正在执行却没有执行完毕的任务,如果设置true
,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning
为true
还是false
,此方法肯定返回false
,即如果取消已经完成的任务会返回false
;如果任务正在执行,若mayInterruptIfRunning
设置为true
,则返回true
,若mayInterruptIfRunning
设置为false
,则返回false
;如果任务还没有执行,则无论mayInterruptIfRunning
为true
还是false
,肯定返回true
。isCancelled
方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
。isDone
方法表示任务是否已经完成,若任务完成,则返回true
;get
方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;get(long timeout, TimeUnit unit)
用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null
。
ListenableFuture接口:
public interface ListenableFuture<T> extends Future<T> { void addCallback(ListenableFutureCallback<? super T> callback); void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback); default CompletableFuture<T> completable() { CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this); addCallback(completable::complete, completable::completeExceptionally); return completable; } }
ListenableFuture
继承了Future
接口,它还额外添加了三个方法,主要用来添加异步现场的回调,可以用来处理异常和获取异步方法的返回值的。AsyncResult
类实现了ListenableFuture
接口,也实现了它所有的方法。接下来,我们将分别介绍如何获取异步处理后的返回值和异常处理。
使用Future接口
我们在AsyncService
接口中添加一个方法:returnMessage()
,并使用Future
接口来进行包装,代码如下:
/** * 异步回调消息方法 * * @return 字符串 */ Future<String> returnMessage();
实现类中的代码如下:
@Override @Async public Future<String> returnMessage() { System.out.println(Thread.currentThread().getName()); String message = "Async Method Result"; return new AsyncResult<>(message); }
那么在Controller
层,就可以获取到Future
的实现类对象,代码如下:
@GetMapping("/page1") public String asyncPage1() { try { System.out.println(Thread.currentThread().getName()); Future<String> result = asyncService.returnMessage(); System.out.println(result.get()); } catch (ExecutionException | InterruptedException e) { log.error("发生了异常:{}", e.getMessage()); } return "async"; }
这里对异步进行了try...catch
异常处理,也使用了Future
的get
方法获取了异步方法的返回值,但是这种获取返回值的方式会阻塞当前线程,也就是说调用了get
方法之后,会等待异步线程执行完毕后才进行下一行代码的执行。
使用ListenableFuture接口
我们在AsyncService
接口中添加一个方法:returnMsg()
,并使用ListenableFuture
接口来进行包装,代码如下:
/** * 异步回调消息方法 * * @return 字符串 */ ListenableFuture<String> returnMsg();
实现类中的代码如下:
@Override @Async public ListenableFuture<String> returnMsg() { System.out.println(Thread.currentThread().getName()); String message = "Async Method Result"; return new AsyncResult<>(message); }
那么在Controller
层,就可以获取到ListenableFuture
的实现类对象,代码如下:
@GetMapping("/page2") public String asyncPage2() { System.out.println(Thread.currentThread().getName()); ListenableFuture<String> result = asyncService.returnMsg(); result.addCallback(new SuccessCallback<String>() { @Override public void onSuccess(String result) { System.out.println("返回的结果是:" + result); } }, new FailureCallback() { @Override public void onFailure(Throwable ex) { log.error("发生了异常:{}", ex.getMessage()); } }); return "async"; }
从上面的代码中可以看出,在返回的结果中添加了两个回调,分别是异步处理成功的回调SuccessCallback
接口的实现类对象和异步处理失败发生异常的回调FailureCallback
接口的实现类对象。ListenableFuture
接口是对Future
接口的扩展,支持回调,有效的避免了线程阻塞问题,也就是说,它会监听Future
接口的执行情况,一旦完成,就会调用onSuccess
方法进行成功后的处理,一旦发生异常,就会调用onFailure
方法进行异常处理。相比较而言,更加推荐使用ListenableFuture
来进行有返回值的异步处理。对于Java1.8
,其实更加推荐使用CompletableFuture
或者guava
的ListenableFuture
,感兴趣的同学可以进行深入研究,他们的处理异步能力会更加强悍。
总结
本文从配置线程池、开启异步线程机制到异步线程的异常处理,分步介绍了在Spring Boot
中如何启用异步线程来提高代码的并发能力,虽然是基于Spring Boot
,但是也同样适用其他的采用Spring
的项目。