Spring Boot啟用非同步執行緒

一般的後台管理系統都有導出報表的功能,對於大數據量的報表導出,通常比較耗時,比如管理員點擊一個導出按鈕,往往要等待很長的時間直到報表成功導出才可以進行下一步操作,顯然這種同步的方式已經滿足不了需求了。現在實際開發中常用的方式是採用JMS消息隊列方式,發送消息到其他的系統中進行導出,或者是在項目中開啟非同步執行緒來完成耗時的導出工作。本文將結合報表導出的場景,來講解一些Spring Boot中如何開啟非同步執行緒。

定義執行緒池和開啟非同步可用

Spring中存在一個介面AsyncConfigurer介面,該介面就是用來配置非同步執行緒池的介面,它有兩個方法,getAsyncExecutorgetAsyncUncaughtExceptionHandler,第一個方法是獲取一個執行緒池,第二個方法是用來處理非同步執行緒中發生的異常。它的源碼如下所示:

package org.springframework.scheduling.annotation;    import java.util.concurrent.Executor;  import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;  import org.springframework.lang.Nullable;    public interface AsyncConfigurer {    	// 獲取執行緒池  	@Nullable  	default Executor getAsyncExecutor() {  		return null;  	}    	// 非同步異常處理器  	@Nullable  	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {  		return null;  	}  }

這裡的介面提供的都是空實現,所以想要開啟非同步執行緒機制,那麼就需要我們手動實現這個介面,將實現該介面的類標註為Spring的配置類,那麼就開啟了Spring的非同步可用,那麼Spring就會通過getAsyncExecutor來獲取一個可用的執行緒來執行某項非同步操作,當然,整個非同步的開啟還需要結合兩個註解,一個是@EnableAsync,另外一個是@Async,第一個是標註在配置類中,用來告訴Spring非同步可用,第二個註解通常標註在某個方法中,當調用這個方法的時候,就會從執行緒池中獲取新的執行緒來執行它。 現在我們來定義執行緒池並開啟非同步可用,這裡寫一個配置類AsyncConfig來實現AsyncConfigurer,程式碼如下所示:

package cn.itlemon.springboot.async.config;    import lombok.extern.slf4j.Slf4j;  import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;  import org.springframework.context.annotation.Configuration;  import org.springframework.scheduling.annotation.AsyncConfigurer;  import org.springframework.scheduling.annotation.EnableAsync;  import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;    import java.util.concurrent.Executor;    /**   * @author jiangpingping   * @date 2018/10/30 19:28   */  @Configuration  @EnableAsync  @Slf4j  public class AsyncConfig implements AsyncConfigurer {        @Override      public Executor getAsyncExecutor() {          // 自定義執行緒池          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();          // 核心執行緒數          taskExecutor.setCorePoolSize(10);          // 最大執行緒數          taskExecutor.setMaxPoolSize(30);          // 執行緒隊列最大執行緒數          taskExecutor.setQueueCapacity(2000);          // 初始化執行緒池          taskExecutor.initialize();          return taskExecutor;      }        @Override      public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {          return (ex, method, params) -> {              log.error("Error Occurs in async method:{}", ex.getMessage());          };      }  }

第一個方法我們定義了一個執行緒池,並設置了一些基本參數,比如核心執行緒數、最大執行緒數、執行緒隊列最大執行緒數等,第二個方法是處理非同步執行緒中發生的異常,它是一個異常處理器,返回AsyncUncaughtExceptionHandler介面的實現類對象,由於AsyncUncaughtExceptionHandler是一個函數式介面(只有一個抽象方法的介面,通常使用@FunctionalInterface註解標註的介面),所以這裡使用了Lambda表達式來簡寫它的實現類對象,這裡的非同步異常處理就是記錄一下日誌,並沒有做其他的邏輯操作,如果對Lambda表達式不熟悉,也可以直接使用匿名內部類的方式來創建AsyncUncaughtExceptionHandler的實現類對象,如下所示:

@Override  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return new AsyncUncaughtExceptionHandler() {          @Override          public void handleUncaughtException(Throwable ex, Method method, Object... params) {              log.error("Error Occurs in async method:{}", ex.getMessage());          }      };  }

需要注意的一點的是,我們在上面的配置類中加入了@EnableAsync註解,那麼在Spring註冊該配置類為Spring Bean的時候,就會開啟非同步可用機制。

測試非同步可用機制

寫一個Service層介面,用來表明生成報表:

package cn.itlemon.springboot.async.service;    import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:32   */  public interface AsyncService {        /**       * 模擬生成報表的非同步方法       */      void generateReport();    }

它的實現類是:

package cn.itlemon.springboot.async.service.impl;    import cn.itlemon.springboot.async.service.AsyncService;  import org.springframework.scheduling.annotation.Async;  import org.springframework.scheduling.annotation.AsyncResult;  import org.springframework.stereotype.Service;    import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:33   */  @Service  public class AsyncServiceImpl implements AsyncService {        @Override      @Async      public void generateReport() {          // 模擬非同步生成報表程式碼,這裡設置為列印          System.out.println("報表執行緒名稱:【" + Thread.currentThread().getName() + "】");      }    }

這裡假設進行了報表的導出工作,所以使用列印語句來進行簡單的模擬,並在方法中標註了@Async註解,那麼當調用該方法的時候,Spring會獲取一個新的執行緒來執行這個方法,所以這裡列印出執行當前方法的執行緒名稱。我們在寫一個控制器,程式碼如下:

package cn.itlemon.springboot.async.controller;    import cn.itlemon.springboot.async.service.AsyncService;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.web.bind.annotation.GetMapping;  import org.springframework.web.bind.annotation.RequestMapping;  import org.springframework.web.bind.annotation.RestController;    import java.util.concurrent.ExecutionException;  import java.util.concurrent.Future;    /**   * @author jiangpingping   * @date 2018/10/30 19:36   */  @RestController  @RequestMapping("/async")  @Slf4j  public class AsyncController {        private final AsyncService asyncService;        @Autowired      public AsyncController(AsyncService asyncService) {          this.asyncService = asyncService;      }        @GetMapping("/page")      public String asyncPage() {          System.out.println("當前請求執行緒名稱為:【" + Thread.currentThread().getName() + "】");          // 非同步調用          asyncService.generateReport();          // 返回結果          return "async";      }    }

我們在當前Controller方法中也列印了當前的執行緒,運行項目,訪問指定的URL,就可以對比在調用generateReport方法的時候是否啟用了新的執行緒。我們啟動Spring Boot應用,在瀏覽器地址欄輸入:http://localhost:8080/async/page,在控制台列印的結果是:

當前請求執行緒名稱為:【http-nio-8080-exec-1】  報表執行緒名稱:【ThreadPoolTaskExecutor-1】

很明顯,這不是同一個執行緒,說明我們開啟非同步執行緒成功。

處理非同步執行緒中的異常

一般在Spring中處理非同步執行緒異常分成兩類,一類是非同步方法沒有返回值,另一類是非同步方法有返回值。

第一類無返回值方法

對於第一類無返回值情況,我們已經在AsyncConfig配置類中進行了配置,即實現getAsyncUncaughtExceptionHandler方法,也就是當非同步執行緒中的程式碼發生了異常,就會調用這個方法來進行異常處理,為了檢驗,我們在AsyncServiceImpl的方法generateReport中手動加一行程式碼System.out.println(1 / 0);,從而導致其出除零異常,程式碼如下所示:

@Override  @Async  public void generateReport() {      // 模擬非同步生成報表程式碼,這裡設置為列印      System.out.println("報表執行緒名稱:【" + Thread.currentThread().getName() + "】");      System.out.println(1 / 0);  }

當再次啟動Spring Boot應用,在瀏覽器地址欄輸入:http://localhost:8080/async/page,那麼將在非同步流程中發生異常,由於是在不同執行緒中發生的異常,所以它並不會影響主執行緒的執行,且發生異常後,由配置了getAsyncUncaughtExceptionHandler方法,那麼該異常將會被處理,處理的方式就是使用日誌進行了記錄:

2018-10-31 10:57:09.952 ERROR 2391 --- [lTaskExecutor-1] c.i.springboot.async.config.AsyncConfig  : Error Occurs in async method:/ by zero
第二類有返回值方法

對於第二種情況,即非同步方法會有返回值,那麼我們如何獲取到非同步執行緒處理後的返回值呢,通常的方法是將非同步方法的返回值使用介面FutureListenableFuture或者類AsyncResult進行包裝,即將返回值作為泛型傳入到上述介面或者類中。這裡我們來簡要分析一下它們的源碼中的常用方法。

Future介面:

public interface Future<V> {    	boolean cancel(boolean mayInterruptIfRunning);    	boolean isCancelled();    	boolean isDone();    	V get() throws InterruptedException, ExecutionException;    	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  }

方法分析:

  • cancel方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunningtrue還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunningtrue還是false,肯定返回true
  • isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true
  • isDone方法表示任務是否已經完成,若任務完成,則返回true
  • get方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
  • get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null

ListenableFuture介面:

public interface ListenableFuture<T> extends Future<T> {    	void addCallback(ListenableFutureCallback<? super T> callback);    	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);    	default CompletableFuture<T> completable() {  		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);  		addCallback(completable::complete, completable::completeExceptionally);  		return completable;  	}  }

ListenableFuture繼承了Future介面,它還額外添加了三個方法,主要用來添加非同步現場的回調,可以用來處理異常和獲取非同步方法的返回值的。AsyncResult類實現了ListenableFuture介面,也實現了它所有的方法。接下來,我們將分別介紹如何獲取非同步處理後的返回值和異常處理。

使用Future介面

我們在AsyncService介面中添加一個方法:returnMessage(),並使用Future介面來進行包裝,程式碼如下:

/**   * 非同步回調消息方法   *   * @return 字元串   */  Future<String> returnMessage();

實現類中的程式碼如下:

@Override  @Async  public Future<String> returnMessage() {      System.out.println(Thread.currentThread().getName());      String message = "Async Method Result";      return new AsyncResult<>(message);  }

那麼在Controller層,就可以獲取到Future的實現類對象,程式碼如下:

@GetMapping("/page1")  public String asyncPage1() {      try {          System.out.println(Thread.currentThread().getName());          Future<String> result = asyncService.returnMessage();          System.out.println(result.get());      } catch (ExecutionException | InterruptedException e) {          log.error("發生了異常:{}", e.getMessage());      }      return "async";  }

這裡對非同步進行了try...catch異常處理,也使用了Futureget方法獲取了非同步方法的返回值,但是這種獲取返回值的方式會阻塞當前執行緒,也就是說調用了get方法之後,會等待非同步執行緒執行完畢後才進行下一行程式碼的執行。

使用ListenableFuture介面

我們在AsyncService介面中添加一個方法:returnMsg(),並使用ListenableFuture介面來進行包裝,程式碼如下:

/**   * 非同步回調消息方法   *   * @return 字元串   */  ListenableFuture<String> returnMsg();

實現類中的程式碼如下:

@Override  @Async  public ListenableFuture<String> returnMsg() {      System.out.println(Thread.currentThread().getName());      String message = "Async Method Result";      return new AsyncResult<>(message);  }

那麼在Controller層,就可以獲取到ListenableFuture的實現類對象,程式碼如下:

@GetMapping("/page2")  public String asyncPage2() {      System.out.println(Thread.currentThread().getName());      ListenableFuture<String> result = asyncService.returnMsg();      result.addCallback(new SuccessCallback<String>() {          @Override          public void onSuccess(String result) {              System.out.println("返回的結果是:" + result);          }      }, new FailureCallback() {          @Override          public void onFailure(Throwable ex) {              log.error("發生了異常:{}", ex.getMessage());          }      });      return "async";  }

從上面的程式碼中可以看出,在返回的結果中添加了兩個回調,分別是非同步處理成功的回調SuccessCallback介面的實現類對象和非同步處理失敗發生異常的回調FailureCallback介面的實現類對象。ListenableFuture介面是對Future介面的擴展,支援回調,有效的避免了執行緒阻塞問題,也就是說,它會監聽Future介面的執行情況,一旦完成,就會調用onSuccess方法進行成功後的處理,一旦發生異常,就會調用onFailure方法進行異常處理。相比較而言,更加推薦使用ListenableFuture來進行有返回值的非同步處理。對於Java1.8,其實更加推薦使用CompletableFuture或者guavaListenableFuture,感興趣的同學可以進行深入研究,他們的處理非同步能力會更加強悍。

總結

本文從配置執行緒池、開啟非同步執行緒機制到非同步執行緒的異常處理,分步介紹了在Spring Boot中如何啟用非同步執行緒來提高程式碼的並發能力,雖然是基於Spring Boot,但是也同樣適用其他的採用Spring的項目。