強大的CompletableFuture
- 2019 年 11 月 10 日
- 筆記
引子
為了讓程式更加高效,讓CPU最大效率的工作,我們會採用非同步編程。首先想到的是開啟一個新的執行緒去做某項工作。再進一步,為了讓新執行緒可以返回一個值,告訴主執行緒事情做完了,於是乎Future粉墨登場。然而Future提供的方式是主執行緒主動問詢新執行緒,要是有個回調函數就爽了。所以,為了滿足Future的某些遺憾,強大的CompletableFuture隨著Java8一起來了。
Future
傳統多執行緒的卻讓程式更加高效,畢竟是非同步,可以讓CPU充分工作,但這僅限於新開的執行緒無需你的主執行緒再費心了。比如你開啟的新執行緒僅僅是為了計算1+…+n再列印結果。有時候你需要子執行緒返回計算結果,在主執行緒中進行進一步計算,就需要Future了。
看下面這個例子,主執行緒計算2+4+6+8+10;子執行緒計算1+3+5+7+9;最後需要在主執行緒中將兩部分結果再相加。
public class OddNumber implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
int result = 1 + 3 + 5 + 7 + 9;
return result;
}
}
public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
OddNumber oddNumber = new OddNumber();
Future<Integer> future = executor.submit(oddNumber);
long startTime = System.currentTimeMillis();
int evenNumber = 2 + 4 + 6 + 8 + 10;
try {
Thread.sleep(1000);
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
int oddNumberResult = future.get();//這時間會被阻塞
System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
} catch (Exception e) {
System.out.println(e);
}
}
}
輸出結果:
0.開始了:1001秒
1+2+...+9+10=55
1.開始了:3002秒
看一下Future介面,只有五個方法比較簡單
//取消任務,如果已經完成或者已經取消,就返回失敗
boolean cancel(boolean mayInterruptIfRunning);
//查看任務是否取消
boolean isCancelled();
//查看任務是否完成
boolean isDone();
//剛才用到了,查看結果,任務未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
//同上,但是加了一個過期時間,防止長時間阻塞,主執行緒也做不了事情
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
CompletableFuture
上面的看到Future的五個方法,不是很豐富,既然我們的主執行緒叫做main,就應該以我為主,我更希望子執行緒做完了事情主動通知我。為此,Java8帶來了CompletableFuture,一個Future的實現類。其實CompletableFuture最迷人的地方並不是極大豐富了Future的功能,而是完美結合了Java8流的新特性。
實現回調,自動後續操作
提前說一下CompletableFuture實現回調的方法(之一):thenAccept()
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
參數有個Consumer,用到了Java8新特性,行為參數化,就是參數不一定是基本類型或者類,也可以是函數(行為),或者說一個方法(介面)。
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableFutureTest {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final int evenNumber = 2 + 4 + 6 + 8 + 10;
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
try {
Thread.sleep(1000);
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
//看這裡,實現回調
oddNumber.thenAccept(oddNumberResult->
{
System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
System.out.println("此時計算結果為:"+(evenNumber+oddNumberResult));
});
oddNumber.get();
} catch (Exception e) {
System.out.println(e);
}
}
}
輸出結果:
0.開始了:1006秒
1.開始了:3006秒
此時計算結果為:55
值得一提的是,本例中並沒有顯示的創建任務連接池,程式會默認選擇一個任務連接池ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool始自JDK7,叫做分支/合併框架。可以通過將一個任務遞歸分成很多分子任務,形成不同的流,進行並行執行,同時還伴隨著強大的工作竊取演算法。極大的提高效率。當然,你也可以自己指定連接池。
CompletableFuture合併
Java8的確豐富了Future實現,CompletableFuture有很多方法可供大家使用,但是但從上面的例子來看,其實CompletableFuture能做的功能,貌似Future。畢竟你CompletableFuture用get()這個方法的時候還不是阻塞了,我Future蠻可以自己拿到返回值,再手動執行一些操作嘛(雖說這樣main方法一定很不爽)。那麼接下來的事情,Future做起來就十分麻煩了。假設我們main方法只做奇數合集加上偶數合集這一個操作,提前算這兩個合集的操作非同步交給兩個子執行緒,我們需要怎麼做呢?沒錯,開啟兩個執行緒,等到兩個執行緒都計算結束的時候,我們進行最後的相加,問題在於,你怎麼知道那個子執行緒最後結束的呢?(貌似可以做個輪詢,不定的調用isDone()這個方法…)豐富的CompletableFuture功能為我們提供了一個方法,用於等待兩個子執行緒都結束了,再進行相加操作:
//asyncPool就是上面提到的默認執行緒池ForkJoinPool
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
看個例子:
public class OddCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class EvenCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2+4+6+8+10;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
return odd + even;
});
System.out.println(resultFuturn.get());
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
輸出結果:
55
0.開始了:3000秒
這邊模擬一個睡1秒,一個睡3秒,但是真正的網路請求時間是不定的。是不是很爽,最爽的還不是現象,而是以上操作已經利用了Java8流的概念。
兩個子執行緒還不夠,那麼還有**anyOff()**函數,可以承受多個CompletableFuture,會等待所有任務都完成。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
與它長的很像的,有個方法,是當第一個執行結束的時候,就結束,後面任務不再等了,可以看作充分條件。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
在上面那個例子的基礎上,把OddNumberPlus類時間調長一點:
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
long startTime = System.currentTimeMillis();
CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
System.out.println(resultFuturn.get());
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
輸出結果:
30
0.開始了:1000秒
小結
CompletableFuture的方法其實還有很多,常用的比如說runAsync(),類似於supplyAsync(),只是沒有返回值;除了thenApply()可以加回調函數以外,還有thenApply();還有注入runAfterBoth()、runAfterEither(),這些見名知意。還有很多,可以點開CompletableFuture這個類的源碼仔細看一看。見微知著,透過CompletableFuture,更加感覺到Java8的強大,強大的流概念、行為參數化、高效的並行理念等等,不僅讓Java寫起來更爽,還不斷豐富Java整個生態。Java一直在進步,所以沒有被時代淘汰,我們Javaer也可以繼續職業生涯,感謝Java,一起進步。
BLOG地址:www.liangsonghua.me
關注微信公眾號:松花皮蛋的黑板報,獲取更多精彩!
公眾號介紹:分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的