使用Reactor響應式編程

介紹

響應式編程

響應式編程不同於我們熟悉的命令式編程,我們熟悉的命令式編程即程式碼就是一行接一行的指令,按照它們的順序一次一條地出現。一個任務被執行,程式就需要等到它執行完了,才能執行下一個任務。每一步,數據都需要完全獲取到了才能被處理,因此它需要作為一個整體來處理。但是所謂的響應式編程是函數式和聲明式的。響應式流處理數據時只要數據是可用的就進行處理,而不是需要將數據作為一個整體進行提供。事實上,輸入數據可以是無窮的(例如,一個地點的實時溫度數據的恆定流)。如下通過一個例子來描述響應式編程和命令式編程的差別:

🌰:某地發生火災,附近有一個水池,我們需要利用水池中的水來滅火。

首先我們將這一系列步驟進行任務抽象:

  1. 取到水池中的水。
  2. 把水運送到火災地進行滅火。

那麼命令式編程,我們把一池水都看成一個整體,那個首先我們需要將一池子的水全部放入救火車中,全部放完後才能拉著這一池子水趕往火災地進行滅火。這也符合上面對命令式編程的描述。一個任務被執行,程式就需要等到它執行完了,才能執行下一個任務。每一步,數據都需要完全獲取到了才能被處理,因此它需要作為一個整體來處理

但是響應式編程就不一樣了,響應式編程並不要求我們把一池子水看成一個整體,而是一系列(無窮的水滴),我們的做法就像拉一根很長的水管,一端連著水池,一端在火災地。我們使用抽水機把水源源不斷的輸送到火災地進行滅火,而不需要命令式編程那樣必須一個任務一個任務串列。即:響應式流處理數據時只要數據是可用的就進行處理,而不是需要將數據作為一個整體進行提供。事實上,輸入數據可以是無窮的

通過上述的例子,可以清晰的分辨響應式編程和傳統的命令式編程。

Reactor

Reactor是基於響應式流的第四代響應式庫規範,用於在JVM上構建非阻塞應用程式。Reactor 工程實現了響應式流的規範,它提供由響應式流組成的函數式 API。正如你將在後面看到的,Reactor 是 Spring 5 響應式編程模型的基礎。關於響應式流在這裡簡要介紹下:

響應式流的規範可以通過四個介面定義來概括:Publisher,Subscriber,Subscription 和 Processor。

  • Publisher:數據生產者
  • Subscriber:數據訂閱者
  • Subscription:數據載體
  • Processor:對生產者的數據進行特定處理,並傳給Subscriber。

關於響應式流的具體規範可以看這裡

回頭看Reactor中,存在兩個核心概念:Mono和Flux。

Flux 表示零個、一個或多個(可能是無限個)數據項的管道。

Mono 特定用於已知的數據返回項不多於一個的響應式類型。

使用彈珠圖來描述二者:

Flux:

Mono:


Spring Boot中使用Reactor

添加依賴

<!--Reactor中的核心庫-->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<!--Reactor測試庫-->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Reactor使用示例

Flux和Mono的操作方法有很多,我們大致的將他們的所有操作分為四類:

  • 創建操作
  • 聯合操作
  • 傳輸操作
  • 邏輯處理操作

創建操作

使用just()方法並傳入元素來創建Flux:

@Test
public void 創建一個Flux並且輸出() {
  Flux<String> flux = Flux.just("1", "2", "3", "4", "5");
  flux.subscribe(f -> System.out.println("Here's some number: " + f));
}

我們可以傳入數組,集合,Stream類來創建Flux:

@Test
public void 從數組中創建一個集合() {
    String[] strs = {"1", "2", "3"};
    Flux<String> flux = Flux.fromArray(strs);
    StepVerifier.create(flux)
      .expectNext("1")
      .expectNext("2")
      .expectNext("3")
      .verifyComplete();

    List<String> strList = new ArrayList<>();
    strList.add("1");
    strList.add("2");
    strList.add("3");
    Flux.fromIterable(strList);

    Flux.fromStream(strList.stream());
}

指定一個範圍來創建Flux:

@Test
public void 提供範圍生成一個Flux() {
    Flux<Integer> flux = Flux.range(0, 3);
    StepVerifier.create(flux)
      .expectNext(0)
      .expectNext(1)
      .expectNext(2)
      .verifyComplete();

  	//🌞來個附加操作:interval方法設置Flux發送數據的頻率,這裡設置每一秒發送一次。
  	//🌛take方法表示限制條目數量,在這裡我們設定Flux最多發送三條數據。
    Flux<Long> flux1 = Flux.interval(Duration.ofSeconds(1L)).take(3L);
    StepVerifier.create(flux1)
      .expectNext(0L)
      .expectNext(1L)
      .expectNext(2L)
      .verifyComplete();
}

聯合操作

Flux提供了多種聯合操作,來結合多個Flux流進行操作:

merge操作:

@Test
public void merge多個Flux() {
    Flux<Integer> flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
    Flux<Integer> flux1 = Flux.range(3,2).delaySubscription(Duration.ofMillis(250))
      .delayElements(Duration.ofMillis(500));
  	//👋使用mergeWith方法來結合兩個Flux流,mergeWith方法不能保證合併後的流中元素的順序
  	//👋所以上面操作我們使用delaySubscription和delayElements來保證元素的順序
  	//delaySubscription:指定時間延遲發送  delayElements:發送元素的時間間隔 
    Flux<Integer> flux2 = flux.mergeWith(flux1);
    flux2.subscribe(f -> System.out.println("Here's some number: " + f));
    StepVerifier.create(flux2)
      .expectNext(0)
      .expectNext(3)
      .expectNext(1)
      .expectNext(4)
      .expectNext(2)
      .verifyComplete();
}

圖解上述程式碼:

zip操作:

@Test
public void 合併多個Flux() {
  Flux<Integer> flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
  Flux<Integer> flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
    .delayElements(Duration.ofMillis(500));
	//👋zip操作將合併兩個Flux流,並且生成一個Tuple2對象,Tuple2中包含兩個流中同順序的元素各一個。
  Flux<Tuple2<Integer, Integer>> flux3 = Flux.zip(flux, flux1);
  flux3.take(3).subscribe(f -> System.out.println(f.toString()));
  StepVerifier.create(flux3)
    .expectNextMatches(t -> t.getT1() == 0 && t.getT2() == 3)
    .expectNextMatches(t -> t.getT1() == 1 && t.getT2() == 4)
    .verifyComplete();
}

圖解上述程式碼:

zip配合指定邏輯操作:

@Test
    public void 合併多個Flux() {
        Flux<Integer> flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
        Flux<Integer> flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
          .delayElements(Duration.ofMillis(500));
				//👋在zip操作中傳入指定的邏輯操作,返回一個操作結果Flux
        Flux<Integer> flux4 = Flux.zip(flux, flux1, (x, y) -> x + y);
        flux4.take(3).subscribe(f -> System.out.println(f.toString()));
        StepVerifier.create(flux4)
                .expectNext(3)
                .expectNext(5)
                .verifyComplete();
    }

圖解上述程式碼:

first操作:

@Test
    public void 只獲取最先發布的Flux() {
        Flux<Integer> flux = Flux.range(0, 3).delayElements(Duration.ofMillis(500));
        Flux<Integer> flux1 = Flux.range(3, 2).delaySubscription(Duration.ofMillis(250))
          .delayElements(Duration.ofMillis(500));
      	//first操作只會使用最先發布元素的那個流
        Flux<Integer> flux2 = Flux.first(flux, flux1);
        StepVerifier.create(flux2)
                .expectNext(0)
                .expectNext(1)
                .expectNext(2)
                .verifyComplete();
    }

圖解上述操作:

轉換&過濾操作

skip操作

@Test
public void 過濾Flux中的數據() {
  //👋skip操作,跳過指定數量的元素
  Flux<Integer> flux = Flux.range(0, 10).skip(8);
  StepVerifier.create(flux)
    .expectNext(8)
    .expectNext(9)
    .verifyComplete();
}

圖解上述操作:

@Test
public void 過濾Flux中的數據() {
  //👋在skip方法中傳入是個時間段,表示跳過這個時間段內輸出的元素
  //👋搭配delayElements方法,每個100毫秒輸出一次
  //👋所以這個測試只會得到7,8,9
  Flux<Integer> flux1 = Flux.range(0, 10).delayElements(Duration.ofMillis(100))
  	.skip(Duration.ofMillis(800));
  StepVerifier.create(flux1)
    .expectNext(7)
    .expectNext(8)
    .expectNext(9)
    .verifyComplete();
}

圖解上述方法:

take操作

@Test
public void 過濾Flux中的數據() {
  //👋take操作與skip相反,表示獲取指定數量的前幾個元素
  Flux<Integer> flux2 = Flux.range(0, 10).delayElements(Duration.ofMillis(100))
    .take(Duration.ofMillis(350));
  StepVerifier.create(flux2)
    .expectNext(0)
    .expectNext(1)
    .expectNext(2)
    .verifyComplete();
}

圖解上述方法:

@Test
public void take() {
  	//👋take方法支援傳入一個時間段,表示只取這個時間段內發布的元素
  	//👋下面操作中我們規定一秒發布一個元素,取3.5秒內的元素
  	//👋所以最後只能得到前三個元素
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));
    
    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

圖解上述方法:

filter操作

@Test
public void 過濾Flux中的數據() {
  //👋filter方法規定一個條件,只拿取符合條件的元素
  //👋下面操作中,我們只拿取小於2的元素
  Flux<Integer> flux3 = Flux.range(0, 10).filter(n -> n < 2);
  StepVerifier.create(flux3)
    .expectNext(0)
    .expectNext(1)
    .verifyComplete();
}

圖解上述方法:

distinct操作

ja@Test
public void 過濾Flux中的數據() {
  //👋distinct方法用於元素去重
  Flux<Integer> flux4 = Flux.just(1, 2, 3, 3, 4, 5, 5).distinct();
  StepVerifier.create(flux4)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .expectNext(4)
    .expectNext(5)
    .verifyComplete();
}

圖解上述方法:

map操作

@Test
public void 映射Flux() {
	//👋map方法,將元素轉換成指定的另一種數據
  //👋下面操作中我們傳入一個匿名的轉換類,指定了我們將字元串轉換為數字
  Flux<Integer> flux = Flux.just("1", "2", "3")
  	.map(Integer::valueOf);
  StepVerifier.create(flux)
    .expectNext(1)
    .expectNext(2)
    .expectNext(3)
    .verifyComplete();
}

圖解如上方法:

flatMap操作

flatMap() 將每個對象映射到一個新的 Mono 或 Flux,最後這些新的Mono或者Flux會被壓成(合成)一個新的Flux。

@Test
public void 映射Flux() {
  //👋如下的flatMap方法將傳入的每個元素都轉成一個Mono
  //👋隨後在Mono裡面傳入一個map轉換邏輯(String->Integer)
  //👋使用subscribeOn來做了一個非同步處理
  //👋最終會形成一個新的Flux,包含來轉換後的元素,但是由於非同步,不能保證順序
  Flux<Integer> flux1 = Flux.just("1", "2", "3", "4")
    .flatMap(m -> Mono.just(m).map(c -> Integer.valueOf(c))
             .subscribeOn(Schedulers.parallel()));
  List<Integer> list = Stream.of(1, 2, 3, 4).collect(Collectors.toList());
  StepVerifier.create(flux1)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .expectNextMatches(list::contains)
    .verifyComplete();
}

圖解上述程式碼:

buffer操作

@Test
public void 緩衝Flux() {
  Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
  //👋buffer方法起到一個緩衝的作用
  //👋我們在buffer中指定一個數字,只有buffer被充滿時或者沒有剩餘元素時,才會發布出去
  //👋因為你有了快取,所以發布出去的是一個元素集合
  Flux<List<Integer>> listFlux = flux.buffer(3);
  StepVerifier.create(listFlux)
    .expectNext(Arrays.asList(1, 2, 3))
    .expectNext(Arrays.asList(4, 5, 6))
    .verifyComplete();

  //👋運行下面的程式碼,查看buffer是如何工作的
  Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x ->
             Flux.fromIterable(x)
             .map(y -> y.toUpperCase())
             .subscribeOn(Schedulers.parallel())
             .log()
            ).subscribe();
}

圖解上述方法:

collectList操作

@Test
public void 緩衝Flux() {
  Flux<Integer> flux1 = Flux.range(1, 6);
  //👋collectList方法用於將含有多個元素的Flux轉換為含有一個元素列表的Mono
  Mono<List<Integer>> mono2 = flux1.collectList();
  StepVerifier.create(mono2)
    .expectNext(Arrays.asList(1, 2, 3, 4, 5, 6))
    .verifyComplete();
}

圖解上述方法:

collectMap操作

@Test
public void 緩衝Flux() {
  	//👋collectMap方法用於將含有多個元素的Flux轉換為含有一個Map的Mono
  	//👋collectMap方法中傳入的是生成鍵的邏輯
    Flux<Integer> flux2 = Flux.range(1, 6);
    Mono<Map<Object, Integer>> mapMono = flux2.collectMap(f -> String.valueOf(f + "i"));
    StepVerifier.create(mapMono)
      .expectNextMatches(m -> m.size() == 6
                         && m.get("1i").equals(1)
                         && m.get("2i").equals(2)
                         && m.get("3i").equals(3))
      .verifyComplete();
}

圖解上述方法:

邏輯操作

@Test
public void Flux的邏輯操作() {
  	//👋有時你只需要知道 Mono 或 Flux 發布的條目是否符合某些條件。all() 和 any() 操作將執行這樣的邏輯。
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
  	//👋any方法,只要任何一個元素符合要求,即返回true
    Mono<Boolean> mono = flux.any(f -> f < 0);
    StepVerifier.create(mono)
            .expectNext(false)
            .verifyComplete();
		//👋all方法,所有元素符合要求,即返回true
    Mono<Boolean> mono1 = flux.all(f -> f > 0);
    StepVerifier.create(mono1)
            .expectNext(true)
            .verifyComplete();
}

圖解上述方法:


總結

本文主要介紹了響應式編程的基本概念,並用一個例子來說明響應式編程和命令式編程的差別。介紹了響應式流模型的實現庫Reactor,並且解釋了Reactor中的一些響應式流概念。使用SpringBoot引入Reactor庫來進行Reactor開發,最後演示了Reactor的一些常見操作。

本文示例程式碼地址://gitee.com/jeker8chen/react-demo.git


關注筆者公眾號,推送各類原創/優質技術文章 ⬇️

WechatIMG6