Spring Boot启用异步线程

一般的后台管理系统都有导出报表的功能,对于大数据量的报表导出,通常比较耗时,比如管理员点击一个导出按钮,往往要等待很长的时间直到报表成功导出才可以进行下一步操作,显然这种同步的方式已经满足不了需求了。现在实际开发中常用的方式是采用JMS消息队列方式,发送消息到其他的系统中进行导出,或者是在项目中开启异步线程来完成耗时的导出工作。本文将结合报表导出的场景,来讲解一些Spring Boot中如何开启异步线程。

定义线程池和开启异步可用

Spring中存在一个接口AsyncConfigurer接口,该接口就是用来配置异步线程池的接口,它有两个方法,getAsyncExecutorgetAsyncUncaughtExceptionHandler,第一个方法是获取一个线程池,第二个方法是用来处理异步线程中发生的异常。它的源码如下所示:

package org.springframework.scheduling.annotation;    import java.util.concurrent.Executor;  import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;  import org.springframework.lang.Nullable;    public interface AsyncConfigurer {    	// 获取线程池  	@Nullable  	default Executor getAsyncExecutor() {  		return null;  	}    	// 异步异常处理器  	@Nullable  	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {  		return null;  	}  }

这里的接口提供的都是空实现,所以想要开启异步线程机制,那么就需要我们手动实现这个接口,将实现该接口的类标注为Spring的配置类,那么就开启了Spring的异步可用,那么Spring就会通过getAsyncExecutor来获取一个可用的线程来执行某项异步操作,当然,整个异步的开启还需要结合两个注解,一个是@EnableAsync,另外一个是@Async,第一个是标注在配置类中,用来告诉Spring异步可用,第二个注解通常标注在某个方法中,当调用这个方法的时候,就会从线程池中获取新的线程来执行它。 现在我们来定义线程池并开启异步可用,这里写一个配置类AsyncConfig来实现AsyncConfigurer,代码如下所示:

package cn.itlemon.springboot.async.config;    import lombok.extern.slf4j.Slf4j;  import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;  import org.springframework.context.annotation.Configuration;  import org.springframework.scheduling.annotation.AsyncConfigurer;  import org.springframework.scheduling.annotation.EnableAsync;  import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;    import java.util.concurrent.Executor;    /**   * @author jiangpingping   * @date 2018/10/30 19:28   */  @Configuration  @EnableAsync  @Slf4j  public class AsyncConfig implements AsyncConfigurer {        @Override      public Executor getAsyncExecutor() {          // 自定义线程池          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();          // 核心线程数          taskExecutor.setCorePoolSize(10);          // 最大线程数          taskExecutor.setMaxPoolSize(30);          // 线程队列最大线程数          taskExecutor.setQueueCapacity(2000);          // 初始化线程池          taskExecutor.initialize();          return taskExecutor;      }        @Override      public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {          return (ex, method, params) -> {              log.error("Error Occurs in async method:{}", ex.getMessage());          };      }  }

第一个方法我们定义了一个线程池,并设置了一些基本参数,比如核心线程数、最大线程数、线程队列最大线程数等,第二个方法是处理异步线程中发生的异常,它是一个异常处理器,返回AsyncUncaughtExceptionHandler接口的实现类对象,由于AsyncUncaughtExceptionHandler是一个函数式接口(只有一个抽象方法的接口,通常使用@FunctionalInterface注解标注的接口),所以这里使用了Lambda表达式来简写它的实现类对象,这里的异步异常处理就是记录一下日志,并没有做其他的逻辑操作,如果对Lambda表达式不熟悉,也可以直接使用匿名内部类的方式来创建AsyncUncaughtExceptionHandler的实现类对象,如下所示:

@Override  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return new AsyncUncaughtExceptionHandler() {          @Override          public void handleUncaughtException(Throwable ex, Method method, Object... params) {              log.error("Error Occurs in async method:{}", ex.getMessage());          }      };  }

需要注意的一点的是,我们在上面的配置类中加入了@EnableAsync注解,那么在Spring注册该配置类为Spring Bean的时候,就会开启异步可用机制。

测试异步可用机制

写一个Service层接口,用来表明生成报表:

package cn.itlemon.springboot.async.service;    import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:32   */  public interface AsyncService {        /**       * 模拟生成报表的异步方法       */      void generateReport();    }

它的实现类是:

package cn.itlemon.springboot.async.service.impl;    import cn.itlemon.springboot.async.service.AsyncService;  import org.springframework.scheduling.annotation.Async;  import org.springframework.scheduling.annotation.AsyncResult;  import org.springframework.stereotype.Service;    import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:33   */  @Service  public class AsyncServiceImpl implements AsyncService {        @Override      @Async      public void generateReport() {          // 模拟异步生成报表代码,这里设置为打印          System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】");      }    }

这里假设进行了报表的导出工作,所以使用打印语句来进行简单的模拟,并在方法中标注了@Async注解,那么当调用该方法的时候,Spring会获取一个新的线程来执行这个方法,所以这里打印出执行当前方法的线程名称。我们在写一个控制器,代码如下:

package cn.itlemon.springboot.async.controller;    import cn.itlemon.springboot.async.service.AsyncService;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.web.bind.annotation.GetMapping;  import org.springframework.web.bind.annotation.RequestMapping;  import org.springframework.web.bind.annotation.RestController;    import java.util.concurrent.ExecutionException;  import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:36   */  @RestController  @RequestMapping("/async")  @Slf4j  public class AsyncController {        private final AsyncService asyncService;        @Autowired      public AsyncController(AsyncService asyncService) {          this.asyncService = asyncService;      }        @GetMapping("/page")      public String asyncPage() {          System.out.println("当前请求线程名称为:【" + Thread.currentThread().getName() + "】");          // 异步调用          asyncService.generateReport();          // 返回结果          return "async";      }    }

我们在当前Controller方法中也打印了当前的线程,运行项目,访问指定的URL,就可以对比在调用generateReport方法的时候是否启用了新的线程。我们启动Spring Boot应用,在浏览器地址栏输入:http://localhost:8080/async/page,在控制台打印的结果是:

当前请求线程名称为:【http-nio-8080-exec-1】  报表线程名称:【ThreadPoolTaskExecutor-1】

很明显,这不是同一个线程,说明我们开启异步线程成功。

处理异步线程中的异常

一般在Spring中处理异步线程异常分成两类,一类是异步方法没有返回值,另一类是异步方法有返回值。

第一类无返回值方法

对于第一类无返回值情况,我们已经在AsyncConfig配置类中进行了配置,即实现getAsyncUncaughtExceptionHandler方法,也就是当异步线程中的代码发生了异常,就会调用这个方法来进行异常处理,为了检验,我们在AsyncServiceImpl的方法generateReport中手动加一行代码System.out.println(1 / 0);,从而导致其出除零异常,代码如下所示:

@Override  @Async  public void generateReport() {      // 模拟异步生成报表代码,这里设置为打印      System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】");      System.out.println(1 / 0);  }

当再次启动Spring Boot应用,在浏览器地址栏输入:http://localhost:8080/async/page,那么将在异步流程中发生异常,由于是在不同线程中发生的异常,所以它并不会影响主线程的执行,且发生异常后,由配置了getAsyncUncaughtExceptionHandler方法,那么该异常将会被处理,处理的方式就是使用日志进行了记录:

2018-10-31 10:57:09.952 ERROR 2391 --- [lTaskExecutor-1] c.i.springboot.async.config.AsyncConfig  : Error Occurs in async method:/ by zero
第二类有返回值方法

对于第二种情况,即异步方法会有返回值,那么我们如何获取到异步线程处理后的返回值呢,通常的方法是将异步方法的返回值使用接口FutureListenableFuture或者类AsyncResult进行包装,即将返回值作为泛型传入到上述接口或者类中。这里我们来简要分析一下它们的源码中的常用方法。

Future接口:

public interface Future<V> {    	boolean cancel(boolean mayInterruptIfRunning);    	boolean isCancelled();    	boolean isDone();    	V get() throws InterruptedException, ExecutionException;    	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  }

方法分析:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunningtrue还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunningtrue还是false,肯定返回true
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
  • isDone方法表示任务是否已经完成,若任务完成,则返回true
  • get方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

ListenableFuture接口:

public interface ListenableFuture<T> extends Future<T> {    	void addCallback(ListenableFutureCallback<? super T> callback);    	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);    	default CompletableFuture<T> completable() {  		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);  		addCallback(completable::complete, completable::completeExceptionally);  		return completable;  	}  }

ListenableFuture继承了Future接口,它还额外添加了三个方法,主要用来添加异步现场的回调,可以用来处理异常和获取异步方法的返回值的。AsyncResult类实现了ListenableFuture接口,也实现了它所有的方法。接下来,我们将分别介绍如何获取异步处理后的返回值和异常处理。

使用Future接口

我们在AsyncService接口中添加一个方法:returnMessage(),并使用Future接口来进行包装,代码如下:

/**   * 异步回调消息方法   *   * @return 字符串   */  Future<String> returnMessage();

实现类中的代码如下:

@Override  @Async  public Future<String> returnMessage() {      System.out.println(Thread.currentThread().getName());      String message = "Async Method Result";      return new AsyncResult<>(message);  }

那么在Controller层,就可以获取到Future的实现类对象,代码如下:

@GetMapping("/page1")  public String asyncPage1() {      try {          System.out.println(Thread.currentThread().getName());          Future<String> result = asyncService.returnMessage();          System.out.println(result.get());      } catch (ExecutionException | InterruptedException e) {          log.error("发生了异常:{}", e.getMessage());      }      return "async";  }

这里对异步进行了try...catch异常处理,也使用了Futureget方法获取了异步方法的返回值,但是这种获取返回值的方式会阻塞当前线程,也就是说调用了get方法之后,会等待异步线程执行完毕后才进行下一行代码的执行。

使用ListenableFuture接口

我们在AsyncService接口中添加一个方法:returnMsg(),并使用ListenableFuture接口来进行包装,代码如下:

/**   * 异步回调消息方法   *   * @return 字符串   */  ListenableFuture<String> returnMsg();

实现类中的代码如下:

@Override  @Async  public ListenableFuture<String> returnMsg() {      System.out.println(Thread.currentThread().getName());      String message = "Async Method Result";      return new AsyncResult<>(message);  }

那么在Controller层,就可以获取到ListenableFuture的实现类对象,代码如下:

@GetMapping("/page2")  public String asyncPage2() {      System.out.println(Thread.currentThread().getName());      ListenableFuture<String> result = asyncService.returnMsg();      result.addCallback(new SuccessCallback<String>() {          @Override          public void onSuccess(String result) {              System.out.println("返回的结果是:" + result);          }      }, new FailureCallback() {          @Override          public void onFailure(Throwable ex) {              log.error("发生了异常:{}", ex.getMessage());          }      });      return "async";  }

从上面的代码中可以看出,在返回的结果中添加了两个回调,分别是异步处理成功的回调SuccessCallback接口的实现类对象和异步处理失败发生异常的回调FailureCallback接口的实现类对象。ListenableFuture接口是对Future接口的扩展,支持回调,有效的避免了线程阻塞问题,也就是说,它会监听Future接口的执行情况,一旦完成,就会调用onSuccess方法进行成功后的处理,一旦发生异常,就会调用onFailure方法进行异常处理。相比较而言,更加推荐使用ListenableFuture来进行有返回值的异步处理。对于Java1.8,其实更加推荐使用CompletableFuture或者guavaListenableFuture,感兴趣的同学可以进行深入研究,他们的处理异步能力会更加强悍。

总结

本文从配置线程池、开启异步线程机制到异步线程的异常处理,分步介绍了在Spring Boot中如何启用异步线程来提高代码的并发能力,虽然是基于Spring Boot,但是也同样适用其他的采用Spring的项目。