Java8系列 (七) CompletableFuture非同步編程
- 2019 年 11 月 8 日
- 筆記
概述
Java8之前用 Future 處理非同步請求, 當你需要獲取任務結果時, 通常的做法是調用 get(long timeout, TimeUnit unit) 此方法會阻塞當前的執行緒, 如果任務處理超時, 就會拋出一個 TimeoutException
@Test public void test1() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> f = executorService.submit(() -> "ceshishanghu"); String s = f.get(3, TimeUnit.SECONDS); System.out.println(s); }
在Java8中引入了 CompletableFuture, 使用它提供的API可以不用像之前那樣阻塞式或輪詢的獲取某個非同步任務的結果, CompletableFuture 會在非同步任務處理完成後自動進行回調, 讓你可以鏈式的組合多個非同步任務。
CompletableFuture 類中提供了許多以 Async 後綴結尾的方法。通常而言,名稱中不帶 Async 的方法和它的前一個任務一樣,在同一個執行緒中運行。而名稱以 Async 結尾的方法會將後續的任務提交到一個執行緒池,所以每個任務是由不同的執行緒處理的。
靜態工廠方法
- supplyAsync(): 非同步處理任務, 有返回值
- runAsync(): 非同步處理任務, 沒有返回值
- allOf(): 需要等待所有的非同步任務都執行完畢,才會返回一個新的CompletableFuture
- anyOf(): 任意一個非同步任務執行完畢,就會返回一個新的CompletableFuture
- completedFuture(): 這種方式獲取的 CompletableFuture 不是非同步的,它會等待獲取明確的返回結果之後再返回一個已經完成的 CompletableFuture
@Test public void test2() { //創建一個已經有任務結果的CompletableFuture CompletableFuture<String> f1 = CompletableFuture.completedFuture("return value"); //非同步處理任務,有返回值 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get); //非同步處理任務,沒有返回值 CompletableFuture<Void> f3 = CompletableFuture.runAsync(System.out::println); //需要等待所有的非同步任務都執行完畢,才會返回一個新的CompletableFuture // CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3); //任意一個非同步任務執行完畢,就會返回一個新的CompletableFuture CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3); Object result = any.join(); System.out.println("result = " + result);//result = return value } public String get() { delay(); return "非同步任務結果"; } public void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
上面的示例中, allOf() 因為要等待所有的非同步任務執行完成,所以要延時1秒鐘才會返回一個新的 CompletableFuture, 而 anyOf() 則不需要等待所有的非同步任務, 因為第一個非同步最先完成, 所以控制台輸出 result = return value 。
鏈式調用
A任務執行完畢, 繼續執行B任務, B任務執行完畢, 繼續執行C任務…
@Test public void test2() { CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> { //測試拋異常後,handle()方法接受並處理 //int x = 1 / 0; return "這是一個栗子"; }).handle((res, ex) -> { System.out.println("handle res = " + res); if (Objects.nonNull(ex)) { System.out.println("handle ex" + ex.getCause().getMessage()); } return Objects.nonNull(ex) ? 0 : 1; }).thenApply(res -> { System.out.println("thenApply res = " + res); return res == 1 ? "success" : "error"; }).thenAccept(res -> System.out.println("thenAccept res = " + res) ).thenRun(() -> System.out.println("沒有參數, 非同步執行一個沒有返回值的任務")); f.join(); }
輸出結果:
handle res = 這是一個栗子 thenApply res = 1 thenAccept res = success 沒有參數, 非同步執行一個沒有返回值的任務
將上面 int x = 1 / 0; 這行程式碼取消注釋, 重新運行結果如下:
handle res = null handle ex/ by zero thenApply res = 0 thenAccept res = error 沒有參數, 非同步執行一個沒有返回值的任務
可以看到, handle() 方法接受前一個 CompletableFuture 的返回結果或拋出的異常作為方法入參, 經過處理後再返回一個新的結果。
級聯組合
- thenCompose(): 對兩個非同步操作進行組合,第一個操作完成時,將其結果作為參數傳遞給第二個操作, 第二個操作會返回一個新的CompletableFuture。
- thenCombine(): 將兩個完全無關聯的非同步請求的結果整合起來, 計算出一個新的值並返回
@Test public void test3() { CompletableFuture<String> f = CompletableFuture.completedFuture("CompletableFuture 1"); CompletableFuture<String> f1 = f.thenCompose(res -> { System.out.println("thenCompose res = " + res); return CompletableFuture.supplyAsync(() -> "CompletableFuture 2"); }); System.out.println(f1.join()); CompletableFuture<Integer> f3 = CompletableFuture.completedFuture(998); CompletableFuture<String> f4 = f.thenCombine(f3, (str, num) -> { System.out.println("str = " + str + ", num= " + num); return str + num; }); System.out.println(f4.join()); }
輸出結果:
thenCompose res = CompletableFuture 1 CompletableFuture 2 str = CompletableFuture 1, num= 998 CompletableFuture 1998
whenComplete
當前一個 CompletableFuture 計算完成或拋出異常時, 可以使用 whenComplete() 執行指定的任務。
@Test public void test4() { CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> { //測試拋異常後,whenComplete()方法接受並處理 int x = 1 / 0; return "這是一個栗子"; }).whenComplete((res, ex) -> { System.out.println("whenComplete res = " + res); if (Objects.nonNull(ex)) { System.out.println("whenComplete ex" + ex.getCause().getMessage()); } }); System.out.println("f.join() = " + f.join()); }
輸出結果如下,其中 res 對應前一個 CompletableFuture 的返回結果,ex 對應前一個 CompletableFuture 拋出的異常(如果發生異常)。
從控制台輸出順序看出,當前一個 CompletableFuture 計算完成或拋出異常時, whenComplete() 會接受它的返回結果或拋出的異常,來做一些其他的事情,最後再返回原來的返回結果或拋出異常。類比下 try/catch 語句塊中的 final 語句塊。
whenComplete res = null whenComplete ex/ by zero java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.ArithmeticException: / by zero at com.java8.action.ChapterTest.lambda$test4$0(ChapterTest.java:22)
異常處理
只有當前一個 CompletableFuture 發生異常時,才會進入到 exceptionally() 方法,並將產生的異常作為入參。
@Test public void test5() { CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> { //測試拋異常後,exceptionally()方法接受並處理 //int x = 1 / 0; return "這是一個栗子"; }).exceptionally(ex -> ex.getCause().getMessage()); System.out.println("f.join() = " + f.join()); }
注釋 int x = 1 / 0; ,輸出如下:
f.join() = 這是一個栗子
取消注釋 int x = 1 / 0; , 輸出如下:
f.join() = / by zero
Both系列方法
- thenAcceptBoth(): 等待當前的 CompletableFuture 和另一個 CompletableFuture 執行完成,將它們的返回結果作為入參去執行一個操作,沒有返回值
- runAfterBoth(): 等待當前的 CompletableFuture 和另一個 CompletableFuture 執行完成,然後去執行一個操作,沒有返回值
程式碼清單一
@Test public void test6() { CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get); CompletableFuture<Void> both = f1.thenAcceptBoth(f2, (num, str) -> System.out.println("num = " + num + ", str = " + str)); both.join(); } public String get() { delay(); return "CompletableFuture 2"; } public void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
程式碼清單一輸出結果如下:
num = 9523, str = CompletableFuture 2
程式碼清單二
@Test public void test7() { CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> ""); CompletableFuture<Void> both = f1.runAfterBoth(f2, () -> System.out.println("執行一個任務,沒有入參")); both.join(); }
程式碼清單二輸出結果如下:
執行一個任務,沒有入參
Either系列
- acceptEither: 當前的 CompletableFuture 和另一個 CompletableFuture 任意一個執行完成,將對應的返回結果作為入參去執行一個操作,沒有返回值
- applyToEither: 當前的 CompletableFuture 和另一個 CompletableFuture 任意一個執行完成,將對應的返回結果作為入參,使用 mapping 函數轉換成一個新的值並返回
- runAfterEither: 當前的 CompletableFuture 和另一個 CompletableFuture 任意一個執行完成,然後去執行一個操作,沒有返回值
程式碼清單三:
@Test public void test8() { CompletableFuture<String> f1 = CompletableFuture.completedFuture("CompletableFuture 1"); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get); CompletableFuture<Void> both = f1.acceptEither(f2, System.out::println); both.join(); } public String get() { delay(); return "CompletableFuture 2"; } public void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
程式碼清單三輸出結果:
CompletableFuture 1
程式碼清單四:
@Test public void test9() { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get); CompletableFuture<String> f2 = CompletableFuture.completedFuture("CompletableFuture 2"); CompletableFuture<Integer> f3 = f1.applyToEither(f2, res -> { System.out.println("res = " + res); return res.length(); }); System.out.println("f3.join() = " + f3.join()); } public String get() { delay();//這裡會延時一秒鐘 return "CompletableFuture 1"; }
程式碼清單四輸出結果:
res = CompletableFuture 2 f3.join() = 19
程式碼清單五:
@Test public void test10() { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get); CompletableFuture<Void> f2 = CompletableFuture.allOf(); CompletableFuture<Void> f3 = f1.runAfterEither(f2, () -> System.out.println("執行一個任務,沒有入參")); f3.join(); } public String get() { delay();//這裡會延時一秒鐘 return "CompletableFuture 1"; }
程式碼清單五輸出結果:
執行一個任務,沒有入參
使用自定義的執行器來處理多個非同步任務
在實際應用場景中可能會遇到這種情況,假如你需要同時處理大量的非同步任務,且這些非同步任務互相不依賴,你只要最後把它們的結果組裝起來就行,這該怎麼實現呢?
下面給出了一個使用默認執行器的示例,通過Stream流同時創建 9 個非同步任務,獲取它們的結果並組裝後返回,其中 Runtime.getRuntime().availableProcessors() 表示Java虛擬機可用的處理器個數,在我之前的文章 Java8系列 (二) Stream流 中有介紹過。
程式碼清單六:
@Test public void test11() { List<String> list = Arrays.asList("王小波書店", "杭州沈記古舊書店", "貓的天空之城概念書店", "純真年代書吧", "南山書屋", "西西弗書店", "新華書店", "鍾書閣", "雲門書屋"); System.out.println("當前機器有" + Runtime.getRuntime().availableProcessors() + "個可用的處理器"); long start = System.nanoTime(); List<CompletableFuture<String>> futures = list.stream() .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str))) .collect(Collectors.toList()); System.out.println("get futures "+(System.nanoTime() - start) / 1000_000 + " msecs"); String result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.joining(",", "[", "]")); System.out.println("get result "+(System.nanoTime() - start) / 1000_000 + " msecs"); System.out.println(result); } public String calculateLength(String str) { delay(); return str; } public void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
運行程式碼清單六,輸出結果:
當前機器有4個可用的處理器 get futures 95 msecs get result 3098 msecs [王小波書店,杭州沈記古舊書店,貓的天空之城概念書店,純真年代書吧,南山書屋,西西弗書店,新華書店,鍾書閣,雲門書屋]
可以看到,雖然使用了非同步處理,但還是花了 3098 毫秒才執行完成所有任務。這是因為 CompletableFuture 內部採用的是通用執行緒池 ForkJoinPool.commonPool() , 默認都使用固定數目的執行緒, 具體執行緒數取決於 Runtime.getRuntime().availableProcessors() 的返回值。
我這裡測試的機器顯示通用執行緒池中處於可用狀態的執行緒數為 4,一次只能同時處理 4 個任務,後面的5個非同步任務只能等到前面某一個操作完成釋放出空閑執行緒才能繼續, 因此總的會消耗約 3 秒鐘的時間。
我們將上面的程式碼進行重構,使用自定義的執行器,通過自定義的執行器你可以指定執行緒池的大小。其中執行緒數的設定可以參考公式 Nthreads = NCPU * UCPU * (1 + W/C)
@Test public void test12() { List<String> list = Arrays.asList("王小波書店", "杭州沈記古舊書店", "貓的天空之城概念書店", "純真年代書吧", "南山書屋", "西西弗書店", "新華書店", "鍾書閣", "雲門書屋"); final ExecutorService executor = Executors.newFixedThreadPool(Math.min(list.size(), 100), r -> { Thread thread = new Thread(r); //守護執行緒不會組織程式的終止 thread.setDaemon(true); return thread; }); System.out.println("當前機器有" + Runtime.getRuntime().availableProcessors() + "個可用的處理器, 當前處理非同步請求的執行緒池大小為 " + Math.min(list.size(), 100)); long start = System.nanoTime(); List<CompletableFuture<String>> futures = list.stream() .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str), executor)) .collect(Collectors.toList()); System.out.println("get futures " + (System.nanoTime() - start) / 1000_000 + " msecs"); String result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.joining(",", "[", "]")); System.out.println("get result " + (System.nanoTime() - start) / 1000_000 + " msecs"); System.out.println(result); } public String calculateLength(String str) { delay(); return str; } public void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }
輸出結果如下:
當前機器有4個可用的處理器, 當前處理非同步請求的執行緒池大小為 9 get futures 38 msecs get result 1039 msecs [王小波書店,杭州沈記古舊書店,貓的天空之城概念書店,純真年代書吧,南山書屋,西西弗書店,新華書店,鍾書閣,雲門書屋]
可以看到,使用自定義的執行器調大執行緒池大小後,總的運行時間只要 1039 毫秒。
將CompletableFuture作為Controller的返回值
上面還存在一個問題,雖然現在可以同時處理多個非同步任務,但是如果需要將非同步結果返回給另一個服務,那不是還得通過 join() 阻塞的獲取到返回值後才能再返回么?
自Spring Boot 1.3 (Spring 4.2) 之後開始支援 CompletableFuture 或 CompletionStage 作為 Controller 的返回值,她很好的解決了上面的非同步阻塞問題,只要將 CompletableFuture 作為 Controller 的返回值,在非同步任務執行完成後,它會自動響應結果給另一個服務。
@RestController public class AsyncController { @GetMapping("/redirect") public CompletableFuture<ModelAndView> redirect() { return CompletableFuture.supplyAsync(() -> { this.delay(); RedirectView redirectView = new RedirectView("https://www.cnblogs.com/qingshanli/"); redirectView.addStaticAttribute("hint", "CompletableFuture組裝ModelAndView視圖,非同步返回結果"); return new ModelAndView(redirectView); }); } @GetMapping("/async") public CompletableFuture<String> async() { System.out.println("async method start"); return CompletableFuture.supplyAsync(() -> { this.delay(); return "CompletableFuture作為Controller的返回值,非同步返回結果"; }).whenComplete((res, ex) -> System.out.println("async method completely, res = " + res + ", ex = " + ex)); } public void delay() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
啟動項目,Postman 訪問 http://localhost:8080/async,截圖如下:
Postman 訪問 http://localhost:8080/redirect,截圖如下:
參考資料
https://github.com/AndreasKl/spring-boot-mvc-completablefuture
https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future
https://www.humansreadcode.com/spring-boot-completablefuture/
作者:張小凡
出處:https://www.cnblogs.com/qingshanli/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】。