體驗RxJava

  • 2019 年 10 月 3 日
  • 筆記

RxJava是 ReactiveX在 Java上的開源的實現,簡單概括,它就是一個實現非同步操作的庫,使用時最直觀的感受就是在使用一個觀察者模式的框架來完成我們的業務需求;
其實java已經有了現成的觀察者模式實現:java.util.Observable和java.util.Observer,那麼為何還要RxJava呢?

java.util.Observable是典型的觀察者模式實現,而RxJava主要功能如下:

  1. 生產者加工數據,然後發布給觀察者;
  2. 觀察者處理數據;
  3. 從生產者生產數據到觀察者處理數據,這之間傳遞的數據可以被處理;
  4. 執行緒切換,生產者發布數據和觀察者處理數據可以在指定執行緒中處理;

RxJava還有個特點就是支援鏈式編碼,再配合lambda,可以保持簡潔和清晰的邏輯(注意是邏輯簡潔,程式碼是否簡潔只能取決於實際業務);

看得出,除了實現觀察者模式,RxJava還提供了更豐富的能力,純文字太枯燥了,我們來編碼實戰吧!

源碼下載

如果您不打算寫程式碼,也可以從GitHub上下載本次實戰的源碼,地址和鏈接資訊如下表所示:

名稱 鏈接 備註
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議

這個git項目中有多個文件夾,本章的應用在rxdemo文件夾下,如下圖紅框所示:

這裡寫圖片描述

源碼僅用來參考,建議自己把程式碼寫出來,才能印象深刻;

準備工作之一:日誌

本次詩函通過列印日誌來觀察程式碼執行情況,會列印時間和執行執行緒,這裡用的是slf4j+log4j的方式;

工程創建完畢後,結構如下:

這裡寫圖片描述

  • log4j.propertieds文件的位置請注意,需要放在上圖紅框位置;
  • 為了在日誌中列印當前執行緒,log4j的配置如上圖綠框所示, %t表示當前執行緒, %r表示程式已經執行的時間;
  • 在pom文件中,對日誌的依賴為:
<dependency>        <groupId>org.slf4j</groupId>        <artifactId>slf4j-log4j12</artifactId>        <version>1.8.0-alpha2</version>      </dependency>

準備工作之二:單元測試

驗證程式碼是通過單元測試實現的,pom.xml文件中,對單元測試的依賴為:

<dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>3.8.1</version>        <scope>test</scope>      </dependency>

單元測試程式碼在如下圖紅框位置:

這裡寫圖片描述

準備工作之三:支援lambda

支援lambda表達式表現在maven支援和intellij idea工具支援兩個方面,具體設置請參照《設置Intellij idea和maven,支援lambda表達式》

準備工作結束,可以正式開發了

RxJava的依賴庫

依賴庫選用1.0.10版本,如下:

<dependency>        <groupId>io.reactivex</groupId>        <artifactId>rxjava</artifactId>        <version>1.0.10</version>      </dependency>

最簡單的觀察者模式實現

第一個例子,我們實踐最簡單的用法:

  1. 創建App.java類,聲明日誌服務:
public class App  {      private static final Logger logger = LoggerFactory.getLogger(App.class);
  1. 開發doExecute方法實現基於Rxjava的觀察者模式:
public void doExecute(){          logger.debug("start doExecute");            //聲明一個觀察者,用來響應被觀察者發布的事件          Observer<String> observer = new Observer<String>() {              /**               * 被觀察者發布結束事件的時候,該方法會被調用               */              public void onCompleted() {                  logger.debug("start onCompleted");              }                /**               * 被觀察者發布事件期間,和觀察者處理事件期間,發生異常的時候,該方法都會被調用               */              public void onError(Throwable throwable) {                  logger.debug("start onError : " + throwable);              }                /**               * 被觀察者發布事件後,該方法會被調用               * @param s               */              public void onNext(String s) {                  logger.debug("start onNext [" + s + "]");              }          };            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {              public void call(Subscriber<? super String> subscriber) {                  //向觀察者發布事件                  subscriber.onNext("Hello");                  //再次向觀察者發布事件                  subscriber.onNext("world");                  //通知觀察者,訂閱結束                  subscriber.onCompleted();              }          });            logger.debug("try subscribe");            //執行訂閱          observable.subscribe(observer);            logger.debug("finish doExecute");      }

程式碼的邏輯很簡單,定義觀察者(observer),被觀察者(observable),執行訂閱;

  1. 本次測試用junit來執行,在test目錄下創建一個AppTest類,具體的目錄和內容如下圖:

這裡寫圖片描述

打開控制台,在pom.xml文件所在目錄下執行mvn test,即可看到日誌:

2017-06-10 10:02:02  [ main:0 ] - [ DEBUG ]  start doExecute  2017-06-10 10:02:02  [ main:19 ] - [ DEBUG ]  try subscribe  2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [Hello]  2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [world]  2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onCompleted  2017-06-10 10:02:02  [ main:23 ] - [ DEBUG ]  finish doExecute

執行的程式碼是observable.subscribe,此程式碼執行後,觀察者的onNext和onCompleted被回調;

簡化的觀察者

在上面的doExecute方法中,我們創建的被觀察者實現了onNext,onError,onCompleted這三個方法,有的場景下我們只關注onNext,對onError和onCompleted都不關心,此時我們可以使用Action1對象來替代Observer,程式碼如下:

public void doAction(){          logger.debug("start doAction");            Action1<String> onNextAction = new Action1<String>() {              public void call(String s) {                  logger.debug("start Action1 onNextAction [" + s + "]");              }          };            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {              public void call(Subscriber<? super String> subscriber) {                  subscriber.onNext("Hello");                  subscriber.onNext("world");                  subscriber.onCompleted();              }          });            logger.debug("try subscribe");            observable.subscribe(onNextAction);            logger.debug("finish doAction");      }

可以看到,只要一個Action1對象即可;

另外,對於錯誤回調也可以用Action1來實現,事件完成的回調用Action0,Action0的特點是方法沒有返回,對於的這些Action,observable.subscribe方法提供了各種重載,我們可以按照自己需要來決定使用哪種,傳入哪些Action;

簡化的被觀察者

在上面的doExecute方法中,被觀察者發布了兩個事件:onNext("Hello")和onNext("world"),我們創建被觀察者是通過Observable.create,然後在call方法中寫入onNext("Hello"),onNext("world")最後在寫上subscriber.onCompleted(),對於這種發布確定的對象事件的場景,rxjava已經做了簡化,直接上程式碼:

public void doFromChain(){          logger.debug("start doFromChain");              //聲明一個觀察者,用來響應被觀察者發布的事件          Action1<String> observer = new Action1<String>() {              /**               * 被觀察者發布事件後,該方法會被調用               * @param s               */              public void call(String s) {                  logger.debug("start onNext [" + s + "]");              }          };              String[] array = {"Hello", "world"};            //from方法可以直接創建被觀察者,並且發布array中的元素對應的事件          Observable.from(array).subscribe(observer);            logger.debug("finish doFromChain");      }

如上程式碼,之前我們創建被觀察者,並且在call方法中依次執行onNext的操作,這些事情都被Observable.from(array)簡化了;

進一步簡化的被觀察者

Observable.from接受的是一個數組,而Observable.just可以直接接受多個元素,我們連創建數組的步驟都省略掉了,再把Action1簡化為lambda,可以得到更加簡化的程式碼:

public void doJustChain(){          logger.debug("start doJustChain");            Observable.just("Hello", "world")                  .subscribe(s -> logger.debug("start onNext [" + s + "]"));            logger.debug("finish doJustChain");      }

經歷了以上的實戰,我們對Rxjava的基本能力有了了解,下面了解一些更複雜的用法;

基本變換

試想,如果被觀察者發布的事件是int型,但是觀察者是處理String型事件的,那麼此觀察者如何才能處理被觀察者發布的事件呢,除了修改觀察者或者被觀察者的程式碼,我們還可以使用Rxjava的變換方法-map:

public void doMap(){          logger.debug("start doMap");            Observable.just(1001, 1002)          .map(intValue -> "int[" + intValue + "]")          .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));              logger.debug("finish doMap");      }

程式碼中可以看到,map方法接受的是Func1介面的實現,由於此介面只聲明了一個方法,所以這裡被簡化成了lambda表達式,lambda表達式的入參由just的入參類型推斷而來,是int型,返回的是字元串,後面的程式碼就可以直接用String型的消費者來處理事件了;

更自由的變換

map方法提供了一對一的映射,但是實際場景中未必是一對一的,例如一個int數字要發起兩個String事件,map就不合適了,RxJava還有個flatMap方法,可以提供這種能力,此處沒用lambda來簡化,可以看的更清楚:

public void doFlatMap(){          logger.debug("start doFlatMap");            Observable.just(101, 102, 103)                  .flatMap(new Func1<Integer, Observable<String>>() {                      public Observable<String> call(final Integer integer) {                          return Observable.create(new Observable.OnSubscribe<String>() {                              public void call(Subscriber<? super String> subscriber) {                                  subscriber.onNext("after flatMap (" + integer + ")");                                  subscriber.onNext("after flatMap (" + (integer+1000) + ")");                              }                          });                      }                  })                  .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));            logger.debug("finish doFlatMap");      }

可以看到,被觀察者發布了三個int事件:101, 102, 103,在flatMap中訂閱了這三個事件,每個事件都可以新建一個被觀察者,這個被觀察者拿到了101,102,103,然後可以按實際需求,選擇發布一個或者多個String事件,甚至不發布,這裡發布的事件,都會被觀察者收到;

執行緒調度

Rxjava可以指定被觀察者發布事件的執行緒,也可以制定觀察者處理事件的執行緒:

public void doSchedule(){          logger.debug("start doSchedule");            Observable.create(subscriber -> {              logger.debug("enter subscribe");              subscriber.onNext("Hello");              subscriber.onCompleted();          })          .subscribeOn(Schedulers.io())          .observeOn(Schedulers.newThread())          .flatMap(str -> {              logger.debug("enter flatMap");              return Observable.create(                      subscriber -> subscriber.onNext("after flatMap (" + str + ")")              );              }          )          .observeOn(Schedulers.newThread())          .subscribe(s -> logger.debug("Observer's onNext invoked [" + s + "]"));          logger.debug("finish doSchedule");      }

subscribeOn()方法指定了被觀察者發布事件的時候使用io類型的執行緒處理,參數Schedulers.io()表示指定的執行緒來自內部實現的一個無數量上限的執行緒池,可以重用空閑的執行緒,適合處理io相關的業務,特點是等待時間長,cup佔用低;

observeOn()方法表示觀察者處理事件的時候使用新執行緒處理,Schedulers.newThread()表示總是啟用新執行緒,並在新執行緒執行操作;
上面程式碼用了兩次observeOn,分別用來指定flatMap中處理事件以及觀察者中處理事件的執行緒;

執行程式碼的結果:

2017-06-10 12:15:42  [ main:0 ] - [ DEBUG ]  start doSchedule  2017-06-10 12:15:42  [ RxCachedThreadScheduler-1:156 ] - [ DEBUG ]  enter subscribe  2017-06-10 12:15:42  [ main:156 ] - [ DEBUG ]  finish doSchedule  2017-06-10 12:15:42  [ RxNewThreadScheduler-2:157 ] - [ DEBUG ]  enter flatMap  2017-06-10 12:15:42  [ RxNewThreadScheduler-1:164 ] - [ DEBUG ]  Observer's onNext invoked [after flatMap (Hello)]

RxCachedThreadScheduler-1:156表示來自執行緒池的快取執行緒;
RxNewThreadScheduler-2:157和RxNewThreadScheduler-1:164表示新的執行緒;

常用的參數類型還有:
Schedulers.immediate(): 直接在當前執行緒運行,相當於不指定執行緒;
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

以上就是Rxjava基礎入門的實戰,希望大家一起實踐並用到日常工作中,簡化邏輯,提升效率;

歡迎關注我的公眾號

在這裡插入圖片描述