Java中的CompletableFuture究竟怎么用

  • 2019 年 10 月 6 日
  • 筆記

1. 对象的创建及完成。

static void complete() throws ExecutionException, InterruptedException {    CompletableFuture<String> f = new CompletableFuture<>();    new Thread() {      @Override      public void run() {        // 该方法会将结果传给CompletableFuture,并将其设置为完成状态        // 一般是异步调用        f.complete("hello");      }    }.start();    System.out.println(f.get()); // 输出 hello  }

2. 异步等待CompletableFuture的完成,并回调方法。

static void whenComplete() {    CompletableFuture<String> f = new CompletableFuture<>();    f.whenComplete(        new BiConsumer<>() {          @Override          public void accept(String s, Throwable throwable) {            if (throwable != null) {              throwable.printStackTrace();            } else {              System.out.println(s);            }          }        });    // f.complete("hello"); // 正常完成    f.completeExceptionally(new RuntimeException()); // 异常完成  }

3. 设置完成的超时时间。

static void timeout() throws InterruptedException {    CompletableFuture<String> f = new CompletableFuture<>();    f.whenComplete(        new BiConsumer<>() {          @Override          public void accept(String s, Throwable throwable) {            if (throwable != null) {              throwable.printStackTrace();            } else {              System.out.println(s);            }          }        });    // f.orTimeout(1, TimeUnit.SECONDS); // 1秒内未完成就抛timeout异常给CompletableFuture    f.completeOnTimeout(        "timeout", 1, TimeUnit.SECONDS); // 一秒内未完成会把timeout字符串作为结果传给CompletableFuture      Thread.sleep(2000); // 等待timeout的发生  }

4. 异步执行某任务,当任务完成时,将结果传给CompletableFuture。

static void supplyAsync() {    ExecutorService exec = Executors.newSingleThreadExecutor();      CompletableFuture<String> f =        CompletableFuture.supplyAsync(            new Supplier<>() {              @Override              public String get() {                try {                  Thread.sleep(1000);                } catch (InterruptedException e) {                  e.printStackTrace();                }                return "hello";              }            },            exec);      f.whenComplete(        new BiConsumer<>() {          @Override          public void accept(String s, Throwable throwable) {            System.out.println(s); // 输出hello          }        });      exec.shutdown();  }

5. 等待其他的所有CompletableFuture完成。

static void allOf() {    CompletableFuture<Integer> f1 = new CompletableFuture<>();    CompletableFuture<Integer> f2 = new CompletableFuture<>();      CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2);    f.whenComplete(        new BiConsumer<>() {          @Override          public void accept(Void aVoid, Throwable throwable) {            System.out.println(f1.getNow(0) + f2.getNow(0)); // 输出3          }        });      f1.complete(1);    f2.complete(2);  }

6. 将异常结果转成正常结果。

static void exceptionally() {    CompletableFuture<String> f1 = new CompletableFuture<>();      // 如果f1是正常结果,则原结果会传给f2    // 如果f1是异常结果,就会调用下面的方法转成正常结果,然后再传给f2    CompletableFuture<String> f2 =        f1.exceptionally(            new Function<>() {              @Override              public String apply(Throwable throwable) {                return "exception";              }            });      // thenAccept方法传入的函数只有在f2是正常结果时才会被调用    f2.thenAccept(        new Consumer<>() {          @Override          public void accept(String s) {            System.out.println(s); // 输出exception          }        });      f1.completeExceptionally(new RuntimeException());  }

7. 对结果做类型转换。

static void handle() {    CompletableFuture<String> f = new CompletableFuture<>();      // 当f完成后会执行handle传入的方法    f.handle(            new BiFunction<String, Throwable, Integer>() {              @Override              public Integer apply(String s, Throwable throwable) {                if (throwable != null) {                  return -1;                }                return Integer.valueOf(s);              }            })        .thenAccept(            new Consumer<>() {              @Override              public void accept(Integer integer) {                System.out.println(integer);              }            });      // f.complete("1"); // 输出1    f.completeExceptionally(new RuntimeException()); // 输出-1  }

8. 写个尽量完整的例子,看下各个方法是如何结合在一起使用的。

import java.util.concurrent.*;  import java.util.concurrent.atomic.AtomicInteger;  import java.util.function.BiConsumer;  import java.util.function.Consumer;    public class DataLoader {    private static final AtomicInteger idGen = new AtomicInteger();    private final ConcurrentMap<Integer, CompletableFuture<byte[]>> futureMap;    private RemoteDataLoader loader;      public DataLoader() {      this.futureMap = new ConcurrentHashMap<>();    }      public void load(int dataID, Consumer<byte[]> dataConsumer, Executor executor) {      int futureID = idGen.incrementAndGet();      CompletableFuture<byte[]> f = new CompletableFuture<>();        // 当future完成后,做些清理工作,然后将数据传给用户      f.whenCompleteAsync(          new BiConsumer<>() {            @Override            public void accept(byte[] data, Throwable throwable) {              futureMap.remove(futureID);              dataConsumer.accept(data);            }          },          executor);        // 3秒内没结果就返回null      f.completeOnTimeout("null".getBytes(), 3, TimeUnit.SECONDS);        // 将future放入map中      futureMap.put(futureID, f);        // 通知remote加载数据并将结果以回调remoteDataLoaded方法的形式返回      loader.load(dataID, futureID);    }      public void remoteDataLoaded(int futureID, byte[] data) {      CompletableFuture<byte[]> f = futureMap.get(futureID);      if (f != null) {        f.complete(data);      }    }      public interface RemoteDataLoader {      void load(int dataID, int futureID);    }      public static void main(String[] args) throws InterruptedException {      DataLoader loader = new DataLoader();      loader.loader =          new RemoteDataLoader() {            @Override            public void load(int dataID, int futureID) {              if (dataID > 0) {                loader.remoteDataLoaded(futureID, "hello".getBytes());              }              // 如果dataID非法,则不返回数据,DataLoader里就会报timeout异常            }          };        // 所有返回的数据都用该Executor执行输出操作      ExecutorService exec = Executors.newCachedThreadPool();        // 正常数据加载,输出hello      loader.load(          1,          new Consumer<>() {            @Override            public void accept(byte[] data) {              System.out.println(new String(data));            }          },          exec);        // 异常数据加载,发生timeout,输出null      loader.load(          0,          new Consumer<>() {            @Override            public void accept(byte[] data) {              System.out.println(new String(data));            }          },          exec);        // 等待timeout发生      Thread.sleep(4000);        // 关闭ExecutorService      exec.shutdown();    }  }

例子中的逻辑不是非常完善,但基本上可以展示CompletableFuture在项目中如何使用,当然,CompletableFuture还有更加复杂和强大的用法,这里就不一一介绍了,感兴趣的朋友可以点击阅读原文,查看其完整的api。

希望对你有所帮助。

完。