Reactive(3)5分鐘理解 SpringBoot 響應式的核心-Reactor

  • 2019 年 10 月 3 日
  • 筆記

一、前言

關於 響應式 Reactive,前面的兩篇文章談了不少概念,基本都離不開下面兩點:

  • 響應式編程是面向流的、異步化的開發方式
  • 響應式是非常通用的概念,無論在前端領域、還是實時流、離線處理場景中都是適用的。

有興趣的朋友可以看看這兩篇文章:

Reactive(1) 從響應式編程到「好萊塢」
Reactive(2) 響應式流與制奶廠業務

這次,我們把目光轉向 SpringBoot,在SpringBoot 2.0版本之後,提供了對響應式編程的全面支持。
因此在升級到 2.x版本之後,便能方便的實現事件驅動模型的後端編程,這其中離不開 webflux這個模塊。
其同時也被 Spring 5 用作開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?

Webflux

Webflux 模塊的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。
該模塊中包含了對 響應式 HTTP、服務器推送 和 WebSocket 的支持。

Webflux 支持兩種不同的編程模型:

  • 第一種是 Spring MVC 中使用的基於 Java 註解的方式,一個使用Reactive風格的Controller如下所示:
@RestController  public class EchoController {      @GetMapping("/echo")      public Mono<String> sayHelloWorld() {          return Mono.just("Echo!");      }  }
  • 第二種是 基於 Java 8 的 lambda 表達式的函數式編程模型。

這兩種編程模型只是在代碼編寫方式上存在不同,但底層的基礎模塊仍然是一樣的。
除此之外,Webflux 可以運行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他異步運行時環境,如 Netty 和 Undertow。

關於Webflux 與 SpringMVC 的區別,可以參考下圖:

SpringBoot、Webflux、Reactor 可以說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor組件。
由此可見,掌握Reactor的用法 必然是熟練進行 Spring 響應式編程的重點。

二、 Mono 與 Flux

在理解響應式Web編程之前,我們需要對Reactor 兩個核心概念做一些澄清,一個是Mono,另一個是Flux。

Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:

  • 正常的包含元素的消息
  • 序列結束的消息
  • 序列出錯的消息

當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。

Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。
Flux 和 Mono 之間可以進行轉換,比如對一個 Flux 序列進行計數操作,得到的結果是一個 Mono對象,或者把兩個 Mono 序列合併在一起,得到的是一個 Flux 對象。

構造器

Reactor提供了非常方便的API來創建 Flux、Mono 對象,如下:

使用靜態工廠類創建Flux

Flux.just("Hello", "World").subscribe(System.out::println);  Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);  Flux.empty().subscribe(System.out::println);  Flux.range(1, 10).subscribe(System.out::println);  Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
  • just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在發佈這些元素之後會自動結束。
  • fromArray():可以從一個數組、Iterable 對象或 Stream 對象中創建 Flux 對象。
  • empty():創建一個不包含任何元素,只發佈結束消息的序列。
  • range(int start, int count):創建包含從 start 起始的 count 個數量的 Integer 對象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):創建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發佈。除了間隔時間之外,還可以指定起始元素發佈之前的延遲時間。

除了上述的方式之外,還可以使用 generate()、create()方法來自定義流數據的產生過程:

generate()

Flux.generate(sink -> {      sink.next("Echo");      sink.complete();  }).subscribe(System.out::println);

generate 只提供序列中單個消息的產生邏輯(同步通知),其中的 sink.next()最多只能調用一次,比如上面的代碼中,產生一個Echo消息後就結束了。

create()

Flux.create(sink -> {      for (char i = 'a'; i <= 'z'; i++) {          sink.next(i);      }      sink.complete();  }).subscribe(System.out::print);

create 提供的是整個序列的產生邏輯,sink.next()可以調用多次(異步通知),如上面的代碼將會產生a-z的小寫字母。

使用靜態工廠類創建Mono

Mono 的創建方式與 Flux 是很相似的。 除了Flux 所擁有的構造方式之外,還可以支持與Callable、Runnable、Supplier 等接口集成。

參考下面的代碼:

Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println);  Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println);  Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);

三、 流計算

1. 緩衝

Reactive(1) 從響應式編程到「好萊塢」 一文中曾經提到過緩衝(buffer)的概念。
buffer 是流處理中非常常用的一種處理,意思就是將流的一段截停後再做處理。

比如下面的代碼:

Flux.range(1, 100).buffer(20).subscribe(System.out::println);  Flux.interval(Duration.of(0, ChronoUnit.SECONDS),                Duration.of(1, ChronoUnit.SECONDS))          .buffer(Duration.of(5, ChronoUnit.SECONDS)).          take(2).toStream().forEach(System.out::println);  Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);  Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)
第二個buffer(Duration duration)是指湊足一段時間後的數據再近些處理,這裡是5秒鐘做一次處理
第三個bufferUtil(Predicate p)是指等到某個元素滿足斷言(條件)時進行收集處理,這裡將會輸出[1,2],[3,4]..這樣的奇偶數字對
第四個bufferWhile(Predicate p)則僅僅是收集滿足斷言(條件)的元素,這裡將會輸出2,4,6..這樣的偶數

與 buffer 類似的是window函數,後者的不同在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換為Flux對象,如下:

Flux.range(1, 100).window(20)        .subscribe(flux ->             flux.buffer(5).subscribe(System.out::println));

window(20)返回的結果是一個Flux類型的對象,我們進而對其進行了緩衝處理。
因此上面的代碼會按5個一組輸出:

[1, 2, 3, 4, 5]  [6, 7, 8, 9, 10]  [11, 12, 13, 14, 15]  ...

2. 過濾/提取

上面的bufferWhile 其實充當了過濾的作用,當然,對於流元素的過濾也可以使用filter函數來處理:

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

take 函數可以用來提取想要的元素,這與filter 過濾動作是恰恰相反的,來看看take的用法:

Flux.range(1, 10).take(2).subscribe(System.out::println);  Flux.range(1, 10).takeLast(2).subscribe(System.out::println);  Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);  Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);

第一個take(2)指提取前面的兩個元素;
第二個takeLast(2)指提取最後的兩個元素;
第三個takeWhile(Predicate p)指提取滿足條件的元素,這裡是1-4
第四個takeUtil(Predicate p)指一直提取直到滿足條件的元素出現為止,這裡是1-6

3. 轉換

使用map函數可以將流中的元素進行個體轉換,如下:

Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);

這裡的map使用的JDK8 所定義的 Function接口

4. 合併

某些情況下我們需要對兩個流中的元素進行合併處理,這與合併兩個數組有點相似,但結合流的特點又會有不同的需求。

使用zipWith函數可以實現簡單的流元素合併處理:

Flux.just("I", "You")          .zipWith(Flux.just("Win", "Lose"))          .subscribe(System.out::println);  Flux.just("I", "You")          .zipWith(Flux.just("Win", "Lose"),          (s1, s2) -> String.format("%s!%s!", s1, s2))          .subscribe(System.out::println);

上面的代碼輸出為:

[I,Win]  [You,Lose]  I!Win!  You!Lose!

第一個zipWith輸出的是Tuple對象(不可變的元祖),第二個zipWith增加了一個BiFunction來實現合併計算,輸出的是字符串。

注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流為準,遵循最短對齊原則。

用於實現合併的還有 combineLastest函數,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是只要產生新元素都會觸發合併動作併產生一個結果元素,如下面的代碼:

Flux.combineLatest(          Arrays::toString,          Flux.interval(Duration.of(0, ChronoUnit.MILLIS),              Duration.of(100, ChronoUnit.MILLIS)).take(2),          Flux.interval(Duration.of(50, ChronoUnit.MILLIS),              Duration.of(100, ChronoUnit.MILLIS)).take(2)  ).toStream().forEach(System.out::println);

輸出為:

[0, 0]  [1, 0]  [1, 1]

5. 合流

與合併比較類似的處理概念是合流,合流的不同之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。
合流的計算可以使用 merge或mergeSequential 函數,這兩者的區別在於:

merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,如下面的代碼:

Flux.merge(Flux.interval(              Duration.of(0, ChronoUnit.MILLIS),              Duration.of(100, ChronoUnit.MILLIS)).take(2),          Flux.interval(                  Duration.of(50, ChronoUnit.MILLIS),                  Duration.of(100, ChronoUnit.MILLIS)).take(2))          .toStream()          .forEach(System.out::println);  System.out.println("---");  Flux.mergeSequential(Flux.interval(          Duration.of(0, ChronoUnit.MILLIS),          Duration.of(100, ChronoUnit.MILLIS)).take(2),          Flux.interval(                  Duration.of(50, ChronoUnit.MILLIS),                  Duration.of(100, ChronoUnit.MILLIS)).take(2))          .toStream()          .forEach(System.out::println);  

輸出為:

0  0  1  1  ---  0  1  0  1

merge 是直接將Flux 元素進行合流之外,而flatMap則提供了更加高級的處理:
flatMap 函數會先將Flux中的元素轉換為 Flux(流),然後再新產生的Flux進行合流處理,如下:

Flux.just(1, 2)          .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS),                  Duration.of(10, ChronoUnit.MILLIS)).take(x))          .toStream()          .forEach(System.out::println);

flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。

6. 累積

reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。reduceWith 允許在在操作時指定一個起始值(與第一個元素進行運算)

如下面的代碼:

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);  Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

這裡通過reduce計算出1-100的累加結果(1+2+3+…100),結果輸出為:

5050  5150

四、異常處理

在前面所提及的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採取一些修正手段也是同樣重要的。

利用Flux/Mono 框架可以很方便的做到這點。

將正常消息和錯誤消息分別打印

Flux.just(1, 2)          .concatWith(Mono.error(new IllegalStateException()))          .subscribe(System.out::println, System.err::println);

當產生錯誤時默認返回0

Flux.just(1, 2)          .concatWith(Mono.error(new IllegalStateException()))          .onErrorReturn(0)          .subscribe(System.out::println);

自定義異常時的處理

Flux.just(1, 2)          .concatWith(Mono.error(new IllegalArgumentException()))          .onErrorResume(e -> {              if (e instanceof IllegalStateException) {                  return Mono.just(0);              } else if (e instanceof IllegalArgumentException) {                  return Mono.just(-1);              }              return Mono.empty();          })          .subscribe(System.out::println);

當產生錯誤時重試

Flux.just(1, 2)          .concatWith(Mono.error(new IllegalStateException()))          .retry(1)          .subscribe(System.out::println);

這裡的retry(1)表示最多重試1次,而且重試將從訂閱的位置開始重新發送流事件

五、線程調度

我們說過,響應式是異步化的,那麼就會涉及到多線程的調度。

Reactor 提供了非常方便的調度器(Scheduler)工具方法,可以指定流的產生以及轉換(計算)發佈所採用的線程調度方式。
這些方式包括:

類別 描述
immediate 採用當前線程
single 單一可復用的線程
elastic 彈性可復用的線程池(IO型)
parallel 並行操作優化的線程池(CPU計算型)
timer 支持任務調度的線程池
fromExecutorService 自定義線程池

下面,以一個簡單的實例來演示不同的線程調度:

Flux.create(sink -> {      sink.next(Thread.currentThread().getName());      sink.complete();  })  .publishOn(Schedulers.single())  .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))  .publishOn(Schedulers.elastic())  .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))  .subscribeOn(Schedulers.parallel())  .toStream()  .forEach(System.out::println);  

在這段代碼中,使用publishOn指定了流發佈的調度器,subscribeOn則指定的是流產生的調度器。
首先是parallel調度器進行流數據的生成,接着使用一個single單線程調度器進行發佈,此時經過第一個map轉換為另一個Flux流,其中的消息疊加了當前線程的名稱。最後進入的是一個elastic彈性調度器,再次進行一次同樣的map轉換。

最終,經過多層轉換後的輸出如下:

[elastic-2] [single-1] parallel-1

小結

SpringBoot 2.x、Spring 5 對於響應式的Web編程(基於Reactor)都提供了全面的支持,藉助於框架的能力可以快速的完成一些簡單的響應式代碼開發。
本文提供了較多 Reactor API的代碼樣例,旨在幫助讀者能快速的理解 響應式編程的概念及方式。

對於習慣了傳統編程範式的開發人員來說,熟練使用 Reactor 仍然需要一些思維上的轉變。
就筆者的自身感覺來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用之後便能體會它的先進之處。 就如 JDK8 引入的Stream API之後,許多開發者則漸漸拋棄forEach的方式..

本身這就是一種生產效率的提升機會,何樂而不為? 更何況,從應用框架的發展前景來看,響應式的前景是明朗的。

參考閱讀

使用 Reactor 進行反應式編程
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

Spring 5 的 WebFlux 開發介紹
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html