Java中的CompletableFuture究竟怎么用
- 2019 年 10 月 6 日
- 筆記
1. 对象的创建及完成。
static void complete() throws ExecutionException, InterruptedException { CompletableFuture<String> f = new CompletableFuture<>(); new Thread() { @Override public void run() { // 该方法会将结果传给CompletableFuture,并将其设置为完成状态 // 一般是异步调用 f.complete("hello"); } }.start(); System.out.println(f.get()); // 输出 hello }
2. 异步等待CompletableFuture的完成,并回调方法。
static void whenComplete() { CompletableFuture<String> f = new CompletableFuture<>(); f.whenComplete( new BiConsumer<>() { @Override public void accept(String s, Throwable throwable) { if (throwable != null) { throwable.printStackTrace(); } else { System.out.println(s); } } }); // f.complete("hello"); // 正常完成 f.completeExceptionally(new RuntimeException()); // 异常完成 }
3. 设置完成的超时时间。
static void timeout() throws InterruptedException { CompletableFuture<String> f = new CompletableFuture<>(); f.whenComplete( new BiConsumer<>() { @Override public void accept(String s, Throwable throwable) { if (throwable != null) { throwable.printStackTrace(); } else { System.out.println(s); } } }); // f.orTimeout(1, TimeUnit.SECONDS); // 1秒内未完成就抛timeout异常给CompletableFuture f.completeOnTimeout( "timeout", 1, TimeUnit.SECONDS); // 一秒内未完成会把timeout字符串作为结果传给CompletableFuture Thread.sleep(2000); // 等待timeout的发生 }
4. 异步执行某任务,当任务完成时,将结果传给CompletableFuture。
static void supplyAsync() { ExecutorService exec = Executors.newSingleThreadExecutor(); CompletableFuture<String> f = CompletableFuture.supplyAsync( new Supplier<>() { @Override public String get() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; } }, exec); f.whenComplete( new BiConsumer<>() { @Override public void accept(String s, Throwable throwable) { System.out.println(s); // 输出hello } }); exec.shutdown(); }
5. 等待其他的所有CompletableFuture完成。
static void allOf() { CompletableFuture<Integer> f1 = new CompletableFuture<>(); CompletableFuture<Integer> f2 = new CompletableFuture<>(); CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2); f.whenComplete( new BiConsumer<>() { @Override public void accept(Void aVoid, Throwable throwable) { System.out.println(f1.getNow(0) + f2.getNow(0)); // 输出3 } }); f1.complete(1); f2.complete(2); }
6. 将异常结果转成正常结果。
static void exceptionally() { CompletableFuture<String> f1 = new CompletableFuture<>(); // 如果f1是正常结果,则原结果会传给f2 // 如果f1是异常结果,就会调用下面的方法转成正常结果,然后再传给f2 CompletableFuture<String> f2 = f1.exceptionally( new Function<>() { @Override public String apply(Throwable throwable) { return "exception"; } }); // thenAccept方法传入的函数只有在f2是正常结果时才会被调用 f2.thenAccept( new Consumer<>() { @Override public void accept(String s) { System.out.println(s); // 输出exception } }); f1.completeExceptionally(new RuntimeException()); }
7. 对结果做类型转换。
static void handle() { CompletableFuture<String> f = new CompletableFuture<>(); // 当f完成后会执行handle传入的方法 f.handle( new BiFunction<String, Throwable, Integer>() { @Override public Integer apply(String s, Throwable throwable) { if (throwable != null) { return -1; } return Integer.valueOf(s); } }) .thenAccept( new Consumer<>() { @Override public void accept(Integer integer) { System.out.println(integer); } }); // f.complete("1"); // 输出1 f.completeExceptionally(new RuntimeException()); // 输出-1 }
8. 写个尽量完整的例子,看下各个方法是如何结合在一起使用的。
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; public class DataLoader { private static final AtomicInteger idGen = new AtomicInteger(); private final ConcurrentMap<Integer, CompletableFuture<byte[]>> futureMap; private RemoteDataLoader loader; public DataLoader() { this.futureMap = new ConcurrentHashMap<>(); } public void load(int dataID, Consumer<byte[]> dataConsumer, Executor executor) { int futureID = idGen.incrementAndGet(); CompletableFuture<byte[]> f = new CompletableFuture<>(); // 当future完成后,做些清理工作,然后将数据传给用户 f.whenCompleteAsync( new BiConsumer<>() { @Override public void accept(byte[] data, Throwable throwable) { futureMap.remove(futureID); dataConsumer.accept(data); } }, executor); // 3秒内没结果就返回null f.completeOnTimeout("null".getBytes(), 3, TimeUnit.SECONDS); // 将future放入map中 futureMap.put(futureID, f); // 通知remote加载数据并将结果以回调remoteDataLoaded方法的形式返回 loader.load(dataID, futureID); } public void remoteDataLoaded(int futureID, byte[] data) { CompletableFuture<byte[]> f = futureMap.get(futureID); if (f != null) { f.complete(data); } } public interface RemoteDataLoader { void load(int dataID, int futureID); } public static void main(String[] args) throws InterruptedException { DataLoader loader = new DataLoader(); loader.loader = new RemoteDataLoader() { @Override public void load(int dataID, int futureID) { if (dataID > 0) { loader.remoteDataLoaded(futureID, "hello".getBytes()); } // 如果dataID非法,则不返回数据,DataLoader里就会报timeout异常 } }; // 所有返回的数据都用该Executor执行输出操作 ExecutorService exec = Executors.newCachedThreadPool(); // 正常数据加载,输出hello loader.load( 1, new Consumer<>() { @Override public void accept(byte[] data) { System.out.println(new String(data)); } }, exec); // 异常数据加载,发生timeout,输出null loader.load( 0, new Consumer<>() { @Override public void accept(byte[] data) { System.out.println(new String(data)); } }, exec); // 等待timeout发生 Thread.sleep(4000); // 关闭ExecutorService exec.shutdown(); } }
例子中的逻辑不是非常完善,但基本上可以展示CompletableFuture在项目中如何使用,当然,CompletableFuture还有更加复杂和强大的用法,这里就不一一介绍了,感兴趣的朋友可以点击阅读原文,查看其完整的api。
希望对你有所帮助。
完。