[享學Netflix] 十六、Hystrix斷路器:初體驗及RxJava簡介

  • 2020 年 3 月 18 日
  • 筆記

減少與單點的交互,是存在單點的系統的核心優化方向。常見的方法有批量寫、客戶端快取等等

程式碼下載地址:https://github.com/f641385712/netflix-learning

目錄

前言

Hystrix也是Netflix OSS的一部分,它是一個斷路器,用於保護你的微服務。隨著微服務的流行,熔斷作為其中一項很重要的技術也廣為人知。當微服務的運行品質低於某個臨界值時(靜態閾值的實現方式),啟動熔斷機制,暫停微服務調用一段時間,以保障後端的微服務不會因為持續過負荷而宕機(熔斷、限流)。

在分散式系統中,單個應用通常會有多個不同類型的外部依賴服務,內部通常依賴於各種RPC服務(當然也可能是Http實現),外部則依賴於各種HTTP服務。這些依賴服務不可避免的會出現調用失敗,比如超時、異常等情況,如何在外部依賴出問題的情況,仍然保證自身應用的穩定,就是Hystrix這類服務保障框架的工作了,這便是隔離的概念,當然還有防止雪崩等功能。

簡單說:某一個功能不可用,是永遠優於全部不可用的。這個特點就像每家每戶的保險絲:某家用大功率電器導致停電了,並不會影響到其它住戶。


正文

源碼地址:https://github.com/Netflix/Hystrix hystrix中文釋義:豪豬。它大概長這樣(豪豬身上有很多刺,這些刺可以很好的保護自己,估計開發團隊給取這個名字也是這個意思):

Hystrix是一個延遲和容錯庫,旨在隔離對遠程系統,服務和第三方庫的訪問點,停止級聯故障,並在不可避免發生故障的複雜分散式系統中實現彈性。

Hystrix的目標就是能夠在1個或多個依賴出現問題時,系統依然可以穩定的運行,其手段包括隔離、限流和降級等。順道複習一下高可用常用的7種手段:

  1. 隔離
  2. 限流
    1. 限流:即限制流量的最大值,是流控的一種方式
  3. 降級fallback
  4. 負載均衡
  5. 超時與重試
  6. 回滾
  7. 壓測與預案

如果做一個簡單的限流功能,那是很容易的,但如果想做更精準的控制、處理後的細分和快速恢復,還有大量的工作需要做。很多RPC框架一般都自帶流控、熔斷的能力,但一般都不夠強大,離自動化還有距離,這就是為何這塊要專門拿出來做的原因(因為很很很很重要)。


Netflix Hystrix

<dependency>      <groupId>com.netflix.hystrix</groupId>      <artifactId>hystrix-core</artifactId>      <version>1.5.18</version>  </dependency>

Hystrix(1.5.18版,2018.11發布)已經足夠穩定,可以滿足Netflix現有應用程式的需求。

官方認為,接下來它們的重心是要轉向對應用程式的實時性能做出反應的自適應性實現,而不是預先配置的設置。也就是說限流使用動態、彈性值,而非事先設定好的閾值來實現。針對於此,若你想有這樣的訴求,官方推薦你使用resilience4j來做。

說明:resilience4j是受Hystrix啟發而做的熔斷器,通過管理遠程調用的容錯處理來幫助實現一個健壯的系統。resilience4j提供了更好用的API,並且提供了很多其他功能比如Rate Limiter(限流器),Bulkhead(艙壁隔離)。

現狀

Hystrix不再處於主動開發階段,目前處於維護模式。Netflix Hystrix現在正式處於維護模式,它已經於2018.11發布了最後一個版本1.5.18,後期也不會再介面社區的pull request,簡單的說就是不會再升級了。

當然,若你覺得有能力扛起這面大旗,你可以給它們團隊發郵件:[email protected],讓它重新回到活動狀態~


hystrix-core的依賴項截圖如下:

這裡有值的一說的兩個核心依賴項:

  • Archaius:配置管理庫。這不就是該系列前十幾篇文章講述的重點麽,這裡就用到了,很激動有木有
  • rxjava:響應式編程庫。這個也是Netflix開源出來的一套非同步編程庫,問下會有介紹

HdrHistogram這個依賴,用於分析取樣的數據,方便畫直方圖等等,和它的metric有關,一般不用太關注


有何用?

說了這麼多,你可能還不知道Hystrix有何用,這裡羅列它的作用如下:

  • 延遲和容錯:停止級聯故障。fallabck和優雅的降級,Fail fast和快速恢復。使用circuit breakers通過執行緒 或者 訊號量隔離。
  • 實時操作:實時監控和配置更改,可以讓屬性被修改後能夠立刻生效(很顯然,這種能力由archauis提供支援)。得到提醒,做出決定,影響改變,並在幾秒鐘內看到結果。
  • 並發:並行執行。支援並發的請求快取。自動批處理(通過請求合併)。

斷路器:HystrixCircuitBreaker是整個Hystrix里一個很重要的抽象,後面也會當作重點詳細說明。 Hystrix包含限流、熔斷等功能的庫類,它能給系統提供快速失敗快速恢復的能力,讓其更具「彈性」。

說明:流控、熔斷和快速恢復是現在大型分散式系統中各個服務節點應該具備的基本抗災容錯能力


工作流程

Hystrix相較於其它組件,屬於比較複雜的。官網裡有一張描述其工作流程的圖示,因為過於複雜本人決定不引用(容易懵逼),而引用一大神的自繪圖,個人覺得把核心、關鍵節點均圈出來了,供以參考:

每個請求都會被包裝成一個Command對象來執行,該圖示展示的一個請求執行的關鍵流程。


快速示例

public class CommandHelloWorld extends HystrixCommand<String> {      private final String name;        // 指定一個HystrixCommandGroupKey,這樣熔斷策略會按照此組執行      public CommandHelloWorld(String name) {          super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));          this.name = name;      }        @Override      protected String run() {          if(name == null){              throw new NullPointerException();          }          return "Hello " + name + "!";      }        @Override      protected String getFallback() {          // super.getFallback():No fallback available.          return "this is fallback msg";      }  }

測試程式碼:

@Test  public void fun2() {      // 三種執行方式:        // 1、普通方式      String s = new CommandHelloWorld("Bob").execute();      System.out.println(s); // Hello Bob!        String fallbackValue = new CommandHelloWorld(null).execute();      // 說明:若你沒有提供fallback函數,那結果是:      // com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld failed and no fallback available.      System.out.println(fallbackValue); // "this is fallback msg"        // 2、非同步方式。什麼時候需要時候什麼時候get      // Future<String> s = new CommandHelloWorld("Bob").queue();      // System.out.println(s.get()); // Hello Bob!        // 3、RxJava方式。吞吐量更高,但對程式設計師的要求更高      // Observable<String> s = new CommandHelloWorld("Bob").observe();      // s.subscribe(d -> System.out.println(d)); // Hello Bob!  }

實例中使用三種方式來執行,均是可以的,各位可自行選擇。


RxJava有話說

由於hystrixy-core依賴於RxJava構建,因此需要做個簡單了解

那麼什麼是RxJava呢?

<dependency>      <groupId>io.reactivex</groupId>      <artifactId>rxjava</artifactId>      <version>1.3.8</version>  </dependency>

這是RxJava的1.x版本(1.x現已停更,於2018.5發布發布最後一版1.3.8

<dependency>      <groupId>io.reactivex.rxjava2</groupId>      <artifactId>rxjava</artifactId>      <version>2.2.18</version>  </dependency>

說明:1.x和2.x的GAV、包名均不同,所以可以和平共處

現行流行版本為2.x分支,若你想單獨使用,推薦使用2.x。但是,但是,但是1.x還存在較大的存量市場,Netflix套件依賴均為1.x,所以依舊存在學習的價值,不容忽視。

當年的Netflix也是為了增加伺服器的性能和吞吐量來編寫RxJava並開源,簡單的說它是一個對響應式編程提供支援的庫,在Android中使用得極多,但實際在Java Server端使用得很少。

RxJava的實質是一個非同步操作庫,用於簡化非同步開發。本文學習的Hystrix雖有涉及到,但並不會深究。


核心概念

注意:以下講解、示例均基於1.x版本

它的核心思想和Java的觀察者模式非常像:被觀察者和觀察者通過訂閱產生一種關係,當被觀察者發生一些改變,通知觀察者,觀察者對應做出相應的回應。

主要有三個關鍵詞需要記住:被觀察者(Observable),訂閱(subscribe),觀察者(Observer)。

  • Observable(被觀察者,也就是數據發射器):public class Observable<T>代表一個被觀察對象
  • Observer(觀察者,也就是數據接收器) :public interface Observer<T>實現此介面便是一個觀察者,有onCompleted/onError/onNext的監聽方法
  • subscribe(訂閱,也就是把發射器和接收器關聯起來):Observable#subscribe(action),訂閱此被觀察者。

執行緒調控Scheduler

RxJava很優勢的一個方面就是他的執行緒切換,基本是依靠ObserveOnSubscribeOn這兩個操作符來完成的。

  • subscribeOn:指定上游事件發射器所用的執行緒,若多次設定,則只有一次起作用
  • observeOn:指定下游操作所使用的執行緒,若多次指定則每次均起作用

Scheduler種類

  • Schedulers.io():用於IO密集型的操作,例如讀取SD卡文件、查詢資料庫、訪問網路等,具有執行緒快取機制
  • Schedulers.newThread():在每執行一次任務時創建一個新的執行緒,不具有執行緒快取機制,效率比Scheduler.io()低,並發性很高,一般不建議使用
  • Schedulers.computation():用於CPU密集型計算任務,即不會被I/O等操作限制性能的耗時操作,例如xml,json文件解析,Bitmap圖片壓縮取樣等。具有固定的執行緒池,大小為CPU的核數。不可以用於I/O操作,因為I/O操作的等待時間浪費CPU。
  • Schedulers.trampoline()在當前執行緒立即執行任務,如果當前執行緒有任務在這執行,則將其停止,等插入進來的任務執行完成之後,在將未執行完成的任務接著執行。
  • Schedulers.immediate():注意此類型在2.x版本已經被廢棄,效果同2.x中的Schedulers.trampoline()
  • Schedulers.from(@NonNull Executor executor):用戶自己指定一個執行緒調度器,由此調度器來控制任務的執行策略
  • Schedulers.test():用於你debug的時候使用

操作符

RxJava操作符:其實質是函數式編程中的高階函數(幫你定義好一些處理邏輯,無需自行實現),方便你操作數據流。

  • 創建:
    • create:使用OnSubscribe從頭創建一個Observable實例:Observable.create(new Observable.OnSubscribe<String>()
    • from:將一個Iterable, 一個Future, 或者一個數組,內部通過代理的方式轉換成一個實例:Observable.from(list)
    • just:將一個或多個對象變為一個實例:Observable.just(1, 2, 3, 4, 5, 6)
    • empty:創建一個什麼都不做直接通知完成的實例
    • error:創建一個什麼都不做直接通知錯誤的實例
    • never:創建一個什麼都不做的實例
    • timer:創建一個在給定的延時之後發射數據項為0的實例Observable<Long>
    • interval:按照給定的時間間隔發射從0開始的整數序列的Observable<Long>
    • range:
    • defer:
  • 過濾:
    • filter:過濾數據。
    • ofType:過濾指定類型的數據,與filter類似
    • take:只發射開始的N項數據或者一定時間內的數據
    • takeLast:
    • takeFirst:
    • firstOrDefault:
    • last/lastOrDefault:
    • skip:
    • skipLast:
    • elementAt/elementAtOrDefault:
    • ignoreElements:
    • distinct:
    • timeout:
    • distinctUntilChanged:
    • throttleFirst:
    • throttleWithTimeout/debounce:
  • 組合/合併:
    • concat:按順序連接多個Observables:Observable.concat(a,b)/a.concatWith(b)
    • startWith:在數據序列的開頭增加一項數據
    • merge:將多個Observable合併為一個。不同於concat,merge不是按照添加順序連接,而是按照時間線來連接。
    • zip:使用一個函數組合多個Observable發射的數據集合,然後再發射這個結果
    • combineLatest
  • 變換:map/flatMap/cast/flatMapIterable/groupBy
  • 聚合:reduce/collect/count/countLong
  • 轉換:toList/toSortedList/toMap/toMultiMap
  • 錯誤處理/重試機制:onErrorResumeNext/onExceptionResumeNext/onErrorReturn/retry/retryWhen…

操作符實在太多了,但是最為常用的不會太多,掌握即可。這裡就不用給使用示例了,因為對於已經能夠很熟練使用Java Stream API的你,這都是小意思~


背壓Backpressure

被壓策略有很多種,比如:

  • ERROR:生產比消費快,那就拋錯
  • DROP:生產比消費快,丟棄新生產的數據

使用示例

@Test  public void fun1() {      // 自定義一個執行緒池:用於處理消費者任務      ExecutorService myDiyThreadExe = Executors.newFixedThreadPool(1, r -> {          Thread thread = new Thread(r);          thread.setName("myDiyThread");          return thread;      });        // Observable.just(1, 2, 3, 4, 5, 6)      Observable.from(new Integer[]{1, 2, 3, 4, 5, 6})              .subscribeOn(Schedulers.io()) //(發送事件的執行緒所在地,只能生效一次)              .observeOn(Schedulers.immediate()) // 設置下面的Map操作,在當前執行緒立馬執行(可生效多次)              .map(i -> {                  System.out.println("map操作執行執行緒:" + Thread.currentThread().getName());                  return i * i;              })              .observeOn(Schedulers.newThread()) // 因為這是新執行緒,所以控制台的日誌列印換亂串~~~              .filter(i -> {                  System.out.println("filter操作執行執行緒:" + Thread.currentThread().getName());                  return i > 10;              })                .observeOn(Schedulers.from(myDiyThreadExe)) // 任務在自定義的執行緒池裡執行              // 處理事件:訂閱:使用Action處理              .subscribe(i -> System.out.printf("subscribe訂閱處理執行緒 %s,值為%s n", Thread.currentThread().getName(), i));        // hold主執行緒      while (true) {      }    }

運行程式,控制台輸出:

map操作執行執行緒:RxIoScheduler-2  map操作執行執行緒:RxIoScheduler-2  filter操作執行執行緒:RxNewThreadScheduler-1  filter操作執行執行緒:RxNewThreadScheduler-1  map操作執行執行緒:RxIoScheduler-2  map操作執行執行緒:RxIoScheduler-2  filter操作執行執行緒:RxNewThreadScheduler-1  map操作執行執行緒:RxIoScheduler-2  filter操作執行執行緒:RxNewThreadScheduler-1  map操作執行執行緒:RxIoScheduler-2  filter操作執行執行緒:RxNewThreadScheduler-1  filter操作執行執行緒:RxNewThreadScheduler-1  subscribe訂閱處理執行緒 myDiyThread,值為16  subscribe訂閱處理執行緒 myDiyThread,值為25  subscribe訂閱處理執行緒 myDiyThread,值為36 

說明:因為filter操作使用的是新執行緒RxNewThreadScheduler,所以它的日誌列印會亂串哦。

當你自己寫main/單測測試非同步程式的時候,請務必hold住主執行緒,否則你將看不到效果,這是初學者常犯的一個小錯誤,此處提醒你一下。


關於RxJava的介紹就先到這,這是一個極簡介紹而已,這裡我貼出幾篇文章,有興趣者可前往閱讀:

非同步、響應式編程從來都不是件容易的事,實操起來更是利弊共存,請大家在實際生產中酌情選型。


總結

關於Netflix Hystrix斷路器:初體驗及RxJava簡介就先介紹到這,通過本文能了解到如下兩部分知識:

  1. Hystrix是什麼,有何用,怎麼用?
  2. RxJava是什麼,有何用,怎麼用?

當然,怎麼用是個較大的話題,關於RxJava怎麼用就先止步於此,有興趣的小朋友自行研究。接下來的文章將重點分析Hystrix的使用方式,以及深度掌握其工作原理,這也能為使用、理解resilience4j奠定一個基礎。