聽說你還不知道CompletableFuture?
- 2020 年 7 月 5 日
- 筆記
java8已經在日常開發編碼中非常普遍了,掌握運用好它可以在開發中運用幾行精簡程式碼就可以完成所需功能。
今天將介紹CompletableFuture的在生產環境如何使用實踐。CompletableFuture類作為Java 8 Concurrency API改進而引入,熟悉的同學應該了解在Java 9 也有對CompletableFuture有一些改進,橘子之後再進入講解。
閱讀這篇文章需要知道的前置知識點有,函數式編程,執行緒池原理等。還不熟悉的同學可以看看之前的文章,話不多說,開始吧。
為了更好的表達,我們結合例子講解,假設今天小橘收到TL任務,要求完成實時拉取數據的功能,完成後告知拉取完成。假設拉取數據需要從A,B,C三個服務中獲取,拉取完成推送需要調用D服務。
需求變更1:拉取數據需要從E服務獲取,但是會依賴從A服務獲取的結果。
需求變更2:從A服務一次能拉去一萬+數據,但是E服務的性能支撐不了大調用,在Provider端有限流兜底。
需求變更3:拉取數據過程中需要保證數據完整性,不能出現統計錯誤。
為什麼使用CompletableFuture
橘友們說了,這個可以用jdk5.0提供的Future
OK,簡單實現這個功能沒有問題,但是有什麼缺陷,需要怎麼可以改進嘛?
我們通過源碼注釋可以看到Future類返回的結果需要阻塞等待get方法返回結果,它提供了isDone()方法檢測非同步計算是否已經結束,get()方法等待非同步操作結束,以及獲取計算的結果。等到所有Future任務完成,通知執行緒獲取結果併合並。
從性能上,需要等待 Future 集合中的所有任務都完成(此需求沒問題,接著往下看), 從健壯性上,Futrue介面沒有方法去進行計算組合或者處理可能出現的錯誤。從功能擴展上,Future介面無法進行多個非同步計算之間相互獨立,同時第二個又依賴於第一個的結果。而今天的主角CompletableFuture都可以滿足上述功能,具有大約50種不同的構成,結合,執行非同步計算步驟和處理錯誤。(全部學習完所有方法是不現實的,掌握靈魂和核心方法即可依法炮製)
CompletableFuture API 使用
API太多,簡單列舉。讀者自行學習即可,本文重點不在介紹api
/**
任務 A 執行完執行 B,執行 B 不需要依賴 A 的結果同時 B 不返回結果。
*/
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
/**
任務 A 執行完執行 B,B 執行依賴 A 結果同時 B 不返回結果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
/**
任務 A 執行完執行 B,B 執行依賴 A 結果同時 B 返回結果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "orange")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " csong"));
//true
assertEquals("orangecsong", completableFuture.get());
你的疑問:該thenCompose方法不和thenApply一樣實現結果合併計算嘛?
剛學習時候確實有點迷惑,其實他們的內部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設計思路在裡面的。都是接收一個CompletableFuture並將其應用於計算結果,但thenCompose(flatMap)方法接收一個函數,該函數返回相同類型的另一個CompletableFuture對象。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "orange")
.thenCombine(CompletableFuture.supplyAsync(
() -> " chizongzi"), (s1, s2) -> s1 + s2));
assertEquals("orange chizongzi", completableFuture.get());
thenCombine方法旨在當你想要使用多個計算結果時,而後續的處理同時需要依賴返回值,第一個計算結果返回 “orange”,第二個計算結果返回 “chizongzi”,對結果進行拼接,那麼結果就是”orange chizongzi” 啦。你可能會問如果結果無需處理呢?thenAcceptBoth將可以實現你的功能。那麼它和thenApply的區別又是啥呢?
thenCompose()方法是使用前一個Future作為參數。它會直接使結果變新的Future,而不是我們在thenApply()中到的嵌套Future,而是用來連接兩個CompletableFuture,是生成一個新的CompletableFuture,因此,如果想要繼續嵌套鏈接CompletableFuture 方法,那麼最好使用thenCompose()。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
當我們需要並行執行多個任務時,我們通常希望等待所有它們執行,然後處理它們的組合結果。CompletableFuture提供了allOf靜態方法允許等待所有的完成任務,但是它返回類型是CompletableFuture 。局限性在於它不會返回所有任務的綜合結果。相反,你必須手動從Futures獲取結果。那麼怎麼解決呢,CompletableFuture提供了join()可以解決,這裡小橘用Stream實現同樣可以的。
String multiFutures= Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Today is sun", multiFutures);
那麼 CompletableFuture 針對異常是如何處理的呢?
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
//如果resultA,resultB,resultC在獲取中有異常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
}).exceptionally(ex -> "errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
上面的程式碼中,任務 A 拋出異常,然後通過exceptionally() 方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。如果inovke future.join方法結果將會輸出 “errorResultA resultB result C”
上述方法基本就是底層函數式api的使用,聰明的橘友們實踐起來吧!
CompletableFuture 例子
Talk is cheap , show me code。自從上篇 你還在擔心rpc介面超時嗎 文章末尾講述大批量調用,其中是順序invoke調用,其實我們分析,非同步調用利用CompletableFuture需要怎麼實現呢?
/**
* @Description:
* @author: orangeCs
* @create: 2020-06-25
*/
public class AsyncInvokeUtil {
private AsyncInvokeUtil() {}
/**
* @param paramList 源數據 (需處理數據載體)
* @param buildParam 中轉函數 (獲取的結果做一層trans,來滿足調用服務條件)
* @param transParam 中轉函數 (獲取的結果做一層trans,來滿足調用服務條件)
* @param processFunction 中轉處理函數
* @param size 分批大小
* @param executorService 暴露外部自定義實現執行緒池(demo沒判空,可以做成非必傳)
* @param <R>
* @param <T>
* @param <P>
* @param <k>
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public static <R, T, P, k> List<R> partitionAsyncInvokeWithRes(List<T> paramList,
Function<List<T>, P> buildParam,
Function<P, List<k>> transParam,
Function<List<k>,List<R>> processFunction,
Integer size,
ExecutorService executorService) throws ExecutionException, InterruptedException {
List<CompletableFuture<List<R>>> completableFutures = Lists.partition(paramList, size).stream()
.map(buildParam)
.map(transParam)
.map(eachList -> CompletableFuture.supplyAsync(() ->
processFunction.apply(eachList), executorService))
.collect(Collectors.toList());
//get
CompletableFuture<Void> finishCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
finishCompletableFuture.get();
return completableFutures.stream().map(CompletableFuture::join)
.filter(Objects::nonNull).reduce(new ArrayList<>(), (resList1, resList2) -> {
resList1.addAll(resList2);
return resList1;
});
}
}
僅僅這一篇文章是不夠的,任何知識都是長期積累,反覆思考才能變成自己的東西,在浮躁的社會,我們年輕人切勿浮躁,今天介紹到這裡了,喜歡部落客的朋友們記得點個關注哦。
> 本文由部落格群發一文多發等運營工具平台 [OpenWrite](//openwrite.cn?from=article_bottom) 發布