Reactor詳解之:異常處理

簡介

不管是在響應式編程還是普通的程式設計中,異常處理都是一個非常重要的方面。今天將會給大家介紹Reactor中異常的處理流程。

Reactor的異常一般處理方法

先舉一個例子,我們創建一個Flux,在這個Flux中,我們產生一個異常,看看是什麼情況:

Flux flux2= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i));
        flux2.subscribe(System.out::println);

我們會得到一個異常ErrorCallbackNotImplemented:

100 / 1 = 100
100 / 2 = 50

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero

那怎麼處理這個異常呢?

有兩種方式,第一種方式就是我們之前文章講過的,在subscribe的時候指定onError方法:

Flux flux2= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i));

        flux2.subscribe(System.out::println,
                error -> System.err.println("Error: " + error));

還是剛才的程式碼,但是這次我們在subscribe的時候,添加了onError處理器,看下運行結果:

Divided by zero :(
100 / 1 = 100
100 / 2 = 50
Error: java.lang.ArithmeticException: / by zero

可以看到異常已經被我們捕獲了,並且進行了合適的處理。

除了在subscribe中進行處理,我們還可以在publish的時候,就指定異常的處理模式,這就是我們要介紹的第二種方法:

        Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorReturn("Divided by zero :(");
        flux.subscribe(System.out::println);

上面的例子中,在創建Flux的時候,手動指定了其onErrorReturn方法,我們看下輸出結果:

100 / 1 = 100
100 / 2 = 50
Divided by zero :(

注意,對於Flux或者Mono來說,所有的異常都是一個終止的操作,即使你使用了異常處理,原生成序列也不會繼續。

但是如果你對異常進行了處理,那麼它會將oneError訊號轉換成為新的序列的開始,並將替換掉之前上游產生的序列。

各種異常處理方式詳解

在一般的程式中,我們的異常應該怎麼處理呢?大家很容易想到的是try catch。而Reactor中subscribe的onError方法,就是try catch的一個具體應用:

Flux flux2= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i));

        flux2.subscribe(System.out::println,
                error -> System.err.println("Error: " + error));

還是上的例子,我們在onError方法中,對異常進行了處理。

如果轉換成為常規程式碼,應該是下面的樣子:

    public void normalErrorHandle(){
        try{
            Arrays.asList(1,2,0).stream().map(i -> "100 / " + i + " = " + (100 / i)).forEach(System.out::println);
        }catch (Exception e){
            System.err.println("Error: " + e);
        }
    }

除了這種最基本的異常處理方法之外,Reactor還提供了很多種不同的異常處理方法,下面我們來一一介紹一下。

Static Fallback Value

Static Fallback Value的意思是,在遇到異常的時候會fallback到一個靜態的默認值。比如我們之前講到的onErrorReturn。

Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorReturn("Divided by zero :(");

當然onErrorReturn還支援一個Predicate參數,用來判斷要falback的異常是否滿足條件。

public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) 

Fallback Method

除了fallback Value之外,還支援Fallback Method。也就是說如果你想在捕獲異常之後調用其他的方法,就可以使用Fallback Method。

這裡Fallback Method是用onErrorResume來表示的。

    public void useFallbackMethod(){
        Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorResume(e -> System.out::println);
        flux.subscribe(System.out::println);
    }

Dynamic Fallback Value

所謂的動態Fallback Value就是根據你拋出的異常進行判斷,通過定位不同的Error從而fallback到不同的值:

    public void useDynamicFallback(){
        Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorResume(error -> Mono.just(
                        MyWrapper.fromError(error)));
    }

    public static class MyWrapper{
        public static String fromError(Throwable error){
            return "That is a new Error";
        }
    }

Catch and Rethrow

同樣的,我們可以在捕獲異常之後進行rethrow:

Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorResume(error -> Flux.error(
                        new RuntimeException("oops, ArithmeticException!", error)));

        Flux flux2= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorMap(error -> new RuntimeException("oops, ArithmeticException!", error));

有兩種方式,第一種就是在onErrorResume中使用Flux.error構建一個新的Flux,另外一種就是直接在onErrorMap中進行處理。

Log or React on the Side

有時候你只是想記錄一下異常資訊,並不想破壞原來的React結構,那麼可以試著使用doOnError。

    public void useDoOnError(){
        Flux flux= Flux.just(1, 2, 0)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .doOnError(error -> System.out.println("we got the error: "+ error));
    }

Finally Block

如果我們在程式碼中使用了某些資源,一般情況下我們需要在finally中對其進行關閉,或者使用JDK7中引入的 try-with-resource 。

舉個例子,下面的是使用finally的方式:

Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}

下面是使用try-with-resource的方式:

try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

那麼在Reactor中,我們也有兩種方式和其對應。

第一種就是doFinally方法:

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1); 

上面的例子中,doFinally實際上做的就是finally block做的事情。

第二種是使用using,我們先看一個using的定義:


	public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
			Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)

可以看到using支援三個參數,resourceSupplier是一個生成器,用來在subscribe的時候生成要發送的resource對象。

sourceSupplier是一個生成Publisher的工廠,接收resourceSupplier傳過來的resource,然後生成Publisher對象。

resourceCleanup用來對resource進行收尾操作。

那麼我們怎麼用呢?

舉個例子:

    public void useUsing(){
        AtomicBoolean isDisposed = new AtomicBoolean();
        Disposable disposableInstance = new Disposable() {
            @Override
            public void dispose() {
                isDisposed.set(true);
            }

            @Override
            public String toString() {
                return "DISPOSABLE";
            }
        };

        Flux<String> flux =
                Flux.using(
                        () -> disposableInstance,
                        disposable -> Flux.just(disposable.toString()),
                        Disposable::dispose);
    }

上面的例子中,我們創建了一個Disposable對象,作為resource,然後對這個resource進行加工,返回一個Flux對象,最後通過調用Disposable::dispose方法,對resource進行銷毀。

Retrying

有時候我們遇到了異常,可能需要重試幾次,Reactor為我們提供了retry方法,先看一個例子:

    public void testRetry(){
        Flux.interval(Duration.ofMillis(250))
                .map(input -> {
                    if (input < 3){
                       return "tick " + input;
                    } 
                    throw new RuntimeException("boom");
                })
                .retry(1)
                .elapsed()
                .subscribe(System.out::println, System.err::println);

        try {
            Thread.sleep(2100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

看下輸出結果:

[264,tick 0]
[255,tick 1]
[241,tick 2]
[506,tick 0]
[252,tick 1]
[253,tick 2]
java.lang.RuntimeException: boom

retry的作用就是當遇到異常的時候,重啟一個新的序列。

elapsed是用來展示產生的value時間之間的duration。

從結果我們可以看到,retry之前是不會產生異常資訊的。

本文的例子learn-reactive

本文作者:flydean程式那些事

本文鏈接://www.flydean.com/reactor-handle-errors/

本文來源:flydean的部落格

歡迎關注我的公眾號:「程式那些事」最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!