Spring Boot啟用非同步執行緒
- 2020 年 4 月 3 日
- 筆記
一般的後台管理系統都有導出報表的功能,對於大數據量的報表導出,通常比較耗時,比如管理員點擊一個導出按鈕,往往要等待很長的時間直到報表成功導出才可以進行下一步操作,顯然這種同步的方式已經滿足不了需求了。現在實際開發中常用的方式是採用
JMS
消息隊列方式,發送消息到其他的系統中進行導出,或者是在項目中開啟非同步執行緒來完成耗時的導出工作。本文將結合報表導出的場景,來講解一些Spring Boot
中如何開啟非同步執行緒。
定義執行緒池和開啟非同步可用
Spring
中存在一個介面AsyncConfigurer
介面,該介面就是用來配置非同步執行緒池的介面,它有兩個方法,getAsyncExecutor
和getAsyncUncaughtExceptionHandler
,第一個方法是獲取一個執行緒池,第二個方法是用來處理非同步執行緒中發生的異常。它的源碼如下所示:
package org.springframework.scheduling.annotation; import java.util.concurrent.Executor; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.lang.Nullable; public interface AsyncConfigurer { // 獲取執行緒池 @Nullable default Executor getAsyncExecutor() { return null; } // 非同步異常處理器 @Nullable default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
這裡的介面提供的都是空實現,所以想要開啟非同步執行緒機制,那麼就需要我們手動實現這個介面,將實現該介面的類標註為Spring
的配置類,那麼就開啟了Spring
的非同步可用,那麼Spring
就會通過getAsyncExecutor
來獲取一個可用的執行緒來執行某項非同步操作,當然,整個非同步的開啟還需要結合兩個註解,一個是@EnableAsync
,另外一個是@Async
,第一個是標註在配置類中,用來告訴Spring
非同步可用,第二個註解通常標註在某個方法中,當調用這個方法的時候,就會從執行緒池中獲取新的執行緒來執行它。 現在我們來定義執行緒池並開啟非同步可用,這裡寫一個配置類AsyncConfig
來實現AsyncConfigurer
,程式碼如下所示:
package cn.itlemon.springboot.async.config; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; /** * @author jiangpingping * @date 2018/10/30 19:28 */ @Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { // 自定義執行緒池 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); // 核心執行緒數 taskExecutor.setCorePoolSize(10); // 最大執行緒數 taskExecutor.setMaxPoolSize(30); // 執行緒隊列最大執行緒數 taskExecutor.setQueueCapacity(2000); // 初始化執行緒池 taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { log.error("Error Occurs in async method:{}", ex.getMessage()); }; } }
第一個方法我們定義了一個執行緒池,並設置了一些基本參數,比如核心執行緒數、最大執行緒數、執行緒隊列最大執行緒數等,第二個方法是處理非同步執行緒中發生的異常,它是一個異常處理器,返回AsyncUncaughtExceptionHandler
介面的實現類對象,由於AsyncUncaughtExceptionHandler
是一個函數式介面(只有一個抽象方法的介面,通常使用@FunctionalInterface
註解標註的介面),所以這裡使用了Lambda
表達式來簡寫它的實現類對象,這裡的非同步異常處理就是記錄一下日誌,並沒有做其他的邏輯操作,如果對Lambda表達式不熟悉,也可以直接使用匿名內部類的方式來創建AsyncUncaughtExceptionHandler
的實現類對象,如下所示:
@Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("Error Occurs in async method:{}", ex.getMessage()); } }; }
需要注意的一點的是,我們在上面的配置類中加入了@EnableAsync
註解,那麼在Spring
註冊該配置類為Spring Bean
的時候,就會開啟非同步可用機制。
測試非同步可用機制
寫一個Service層介面,用來表明生成報表:
package cn.itlemon.springboot.async.service; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:32 */ public interface AsyncService { /** * 模擬生成報表的非同步方法 */ void generateReport(); }
它的實現類是:
package cn.itlemon.springboot.async.service.impl; import cn.itlemon.springboot.async.service.AsyncService; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:33 */ @Service public class AsyncServiceImpl implements AsyncService { @Override @Async public void generateReport() { // 模擬非同步生成報表程式碼,這裡設置為列印 System.out.println("報表執行緒名稱:【" + Thread.currentThread().getName() + "】"); } }
這裡假設進行了報表的導出工作,所以使用列印語句來進行簡單的模擬,並在方法中標註了@Async
註解,那麼當調用該方法的時候,Spring
會獲取一個新的執行緒來執行這個方法,所以這裡列印出執行當前方法的執行緒名稱。我們在寫一個控制器,程式碼如下:
package cn.itlemon.springboot.async.controller; import cn.itlemon.springboot.async.service.AsyncService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * @author jiangpingping * @date 2018/10/30 19:36 */ @RestController @RequestMapping("/async") @Slf4j public class AsyncController { private final AsyncService asyncService; @Autowired public AsyncController(AsyncService asyncService) { this.asyncService = asyncService; } @GetMapping("/page") public String asyncPage() { System.out.println("當前請求執行緒名稱為:【" + Thread.currentThread().getName() + "】"); // 非同步調用 asyncService.generateReport(); // 返回結果 return "async"; } }
我們在當前Controller
方法中也列印了當前的執行緒,運行項目,訪問指定的URL
,就可以對比在調用generateReport
方法的時候是否啟用了新的執行緒。我們啟動Spring Boot
應用,在瀏覽器地址欄輸入:http://localhost:8080/async/page
,在控制台列印的結果是:
當前請求執行緒名稱為:【http-nio-8080-exec-1】 報表執行緒名稱:【ThreadPoolTaskExecutor-1】
很明顯,這不是同一個執行緒,說明我們開啟非同步執行緒成功。
處理非同步執行緒中的異常
一般在Spring
中處理非同步執行緒異常分成兩類,一類是非同步方法沒有返回值,另一類是非同步方法有返回值。
第一類無返回值方法
對於第一類無返回值情況,我們已經在AsyncConfig
配置類中進行了配置,即實現getAsyncUncaughtExceptionHandler
方法,也就是當非同步執行緒中的程式碼發生了異常,就會調用這個方法來進行異常處理,為了檢驗,我們在AsyncServiceImpl
的方法generateReport
中手動加一行程式碼System.out.println(1 / 0);
,從而導致其出除零異常,程式碼如下所示:
@Override @Async public void generateReport() { // 模擬非同步生成報表程式碼,這裡設置為列印 System.out.println("報表執行緒名稱:【" + Thread.currentThread().getName() + "】"); System.out.println(1 / 0); }
當再次啟動Spring Boot
應用,在瀏覽器地址欄輸入:http://localhost:8080/async/page
,那麼將在非同步流程中發生異常,由於是在不同執行緒中發生的異常,所以它並不會影響主執行緒的執行,且發生異常後,由配置了getAsyncUncaughtExceptionHandler
方法,那麼該異常將會被處理,處理的方式就是使用日誌進行了記錄:
2018-10-31 10:57:09.952 ERROR 2391 --- [lTaskExecutor-1] c.i.springboot.async.config.AsyncConfig : Error Occurs in async method:/ by zero
第二類有返回值方法
對於第二種情況,即非同步方法會有返回值,那麼我們如何獲取到非同步執行緒處理後的返回值呢,通常的方法是將非同步方法的返回值使用介面Future
、ListenableFuture
或者類AsyncResult
進行包裝,即將返回值作為泛型傳入到上述介面或者類中。這裡我們來簡要分析一下它們的源碼中的常用方法。
Future
介面:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
方法分析:
cancel
方法用來取消任務,如果取消任務成功則返回true
,如果取消任務失敗則返回false
。參數mayInterruptIfRunning
表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true
,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning
為true
還是false
,此方法肯定返回false
,即如果取消已經完成的任務會返回false
;如果任務正在執行,若mayInterruptIfRunning
設置為true
,則返回true
,若mayInterruptIfRunning
設置為false
,則返回false
;如果任務還沒有執行,則無論mayInterruptIfRunning
為true
還是false
,肯定返回true
。isCancelled
方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true
。isDone
方法表示任務是否已經完成,若任務完成,則返回true
;get
方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;get(long timeout, TimeUnit unit)
用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null
。
ListenableFuture介面:
public interface ListenableFuture<T> extends Future<T> { void addCallback(ListenableFutureCallback<? super T> callback); void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback); default CompletableFuture<T> completable() { CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this); addCallback(completable::complete, completable::completeExceptionally); return completable; } }
ListenableFuture
繼承了Future
介面,它還額外添加了三個方法,主要用來添加非同步現場的回調,可以用來處理異常和獲取非同步方法的返回值的。AsyncResult
類實現了ListenableFuture
介面,也實現了它所有的方法。接下來,我們將分別介紹如何獲取非同步處理後的返回值和異常處理。
使用Future介面
我們在AsyncService
介面中添加一個方法:returnMessage()
,並使用Future
介面來進行包裝,程式碼如下:
/** * 非同步回調消息方法 * * @return 字元串 */ Future<String> returnMessage();
實現類中的程式碼如下:
@Override @Async public Future<String> returnMessage() { System.out.println(Thread.currentThread().getName()); String message = "Async Method Result"; return new AsyncResult<>(message); }
那麼在Controller
層,就可以獲取到Future
的實現類對象,程式碼如下:
@GetMapping("/page1") public String asyncPage1() { try { System.out.println(Thread.currentThread().getName()); Future<String> result = asyncService.returnMessage(); System.out.println(result.get()); } catch (ExecutionException | InterruptedException e) { log.error("發生了異常:{}", e.getMessage()); } return "async"; }
這裡對非同步進行了try...catch
異常處理,也使用了Future
的get
方法獲取了非同步方法的返回值,但是這種獲取返回值的方式會阻塞當前執行緒,也就是說調用了get
方法之後,會等待非同步執行緒執行完畢後才進行下一行程式碼的執行。
使用ListenableFuture介面
我們在AsyncService
介面中添加一個方法:returnMsg()
,並使用ListenableFuture
介面來進行包裝,程式碼如下:
/** * 非同步回調消息方法 * * @return 字元串 */ ListenableFuture<String> returnMsg();
實現類中的程式碼如下:
@Override @Async public ListenableFuture<String> returnMsg() { System.out.println(Thread.currentThread().getName()); String message = "Async Method Result"; return new AsyncResult<>(message); }
那麼在Controller
層,就可以獲取到ListenableFuture
的實現類對象,程式碼如下:
@GetMapping("/page2") public String asyncPage2() { System.out.println(Thread.currentThread().getName()); ListenableFuture<String> result = asyncService.returnMsg(); result.addCallback(new SuccessCallback<String>() { @Override public void onSuccess(String result) { System.out.println("返回的結果是:" + result); } }, new FailureCallback() { @Override public void onFailure(Throwable ex) { log.error("發生了異常:{}", ex.getMessage()); } }); return "async"; }
從上面的程式碼中可以看出,在返回的結果中添加了兩個回調,分別是非同步處理成功的回調SuccessCallback
介面的實現類對象和非同步處理失敗發生異常的回調FailureCallback
介面的實現類對象。ListenableFuture
介面是對Future
介面的擴展,支援回調,有效的避免了執行緒阻塞問題,也就是說,它會監聽Future
介面的執行情況,一旦完成,就會調用onSuccess
方法進行成功後的處理,一旦發生異常,就會調用onFailure
方法進行異常處理。相比較而言,更加推薦使用ListenableFuture
來進行有返回值的非同步處理。對於Java1.8
,其實更加推薦使用CompletableFuture
或者guava
的ListenableFuture
,感興趣的同學可以進行深入研究,他們的處理非同步能力會更加強悍。
總結
本文從配置執行緒池、開啟非同步執行緒機制到非同步執行緒的異常處理,分步介紹了在Spring Boot
中如何啟用非同步執行緒來提高程式碼的並發能力,雖然是基於Spring Boot
,但是也同樣適用其他的採用Spring
的項目。