Spring Cloud Hystrix原理篇(十一)
一、Hystrix處理流程
Hystrix流程圖如下:
Hystrix整個工作流如下:
- 構造一個 HystrixCommand或HystrixObservableCommand對象,用於封裝請求,並在構造方法配置請求被執行需要的參數;
- 執行命令,Hystrix提供了4種執行命令的方法,後面詳述;
- 判斷是否使用快取響應請求,若啟用了快取,且快取可用,直接使用快取響應請求。Hystrix支援請求快取,但需要用戶自定義啟動;
- 判斷熔斷器是否打開,如果打開,跳到第8步;
- 判斷執行緒池/隊列/訊號量是否已滿,已滿則跳到第8步;
- 執行HystrixObservableCommand.construct()或HystrixCommand.run(),如果執行失敗或者超時,跳到第8步;否則,跳到第9步;
- 統計熔斷器監控指標;
- 走Fallback備用邏輯
- 返回請求響應
從流程圖上可知道,第5步執行緒池/隊列/訊號量已滿時,還會執行第7步邏輯,更新熔斷器統計資訊,而第6步無論成功與否,都會更新熔斷器統計資訊。
二、Hystrix的核心原理
hystrix在服務降級熔斷的過程中有幾個步驟他是必須要去完成的
- 可配置化的降級策略(根據不同的服務降級類型配置不同的降級策略方案):
- 三種方式:訊號量/執行緒 、超時(默認1s)、熔斷(錯誤率)
- 在HystrixCommandProperty類中通過相關屬性去配置改變他的默認策略(上篇中有說明過)
- 可以識別的降級邊界:
- @HystrixCommand(Spring AOP通過註解標註一個介面的資源,去表示說明這個介面需要通過Hystrix來接管這個請求,如果達到註解內的配置要求就熔斷)
- 自己去繼承HystrixCommand 抽象類,等下演示下,這玩意還挺好玩的
- 數據採集:
- 如何觸發熔斷(上篇幅也說過10s 內20個請求 ,錯誤率達到50),這裡引出的問題是如何採集數據,如何統計數據.
- SEMAPHORE,最大並發數量 (它底層其實就是個AQS 統計次數tryAcquire(), acquire())
- 行為干預: 觸發降級/熔斷之後,對正常業務產生影響
- 結果乾預: 通過fallback()返回數據
- 自動恢復(處於熔斷狀態下,會每隔5s嘗試去恢復)
2.1、通過HystrixCommand 接管我們定義的請求
上一篇幅我是通過註解的方式來進行服務熔錯的,這次不通過註解換一種方式,首先在spring-cloud-user服務中寫以下內容
然後啟動服務訪問瀏覽器,結果如果我想的一樣
2.2、Hystrix是如何工作的
下面演示個帶超時降級的Hystrix註解
然後用AOP寫自己的攔截規則
/** *這裡面用到的是AOP的知識點,如果不了解可以先自行補下,後面我有空把Spring的AOP原理也寫下,這樣回頭看這個就沒這麼難了 */ @Component @Aspect //切入 public class GhyHystrixAspect { //通過執行緒池去請求 ExecutorService executorService= Executors.newFixedThreadPool(10); //定義切點針對GhyHystrix進行切入 @Pointcut(value = "@annotation(GhyHystrix)") public void pointCut(){} //切入後執行的方法 @Around(value = "pointCut()&&@annotation(hystrixCommand)") public Object doPointCut(ProceedingJoinPoint joinPoint, GhyHystrix hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { //定義超時降級 int timeout=hystrixCommand.timeout(); //前置的判斷邏輯 Future future=executorService.submit(()->{ try { return joinPoint.proceed(); //執行目標方法 } catch (Throwable throwable) { throwable.printStackTrace(); } return null; }); Object result; try { //得到開始和結束時間判斷是否超時,如果超時就降級 result=future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); //超時了就取消請求 future.cancel(true); // 先判斷是否為空如果空就把異常拋出去 if(StringUtils.isBlank(hystrixCommand.fallback())){ throw e; } //調用fallback result=invokeFallback(joinPoint,hystrixCommand.fallback()); } return result; } //反射調用 private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { MethodSignature signature=(MethodSignature)joinPoint.getSignature(); //拿到方法的資訊 Method method=signature.getMethod(); //得到參數類型 Class<?>[] parameterTypes=method.getParameterTypes(); //以上是獲取被代理的方法的參數和Method //得到fallback方法 try { Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes); fallbackMethod.setAccessible(true); //完成反射調用 return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs()); } catch (Exception e) { e.printStackTrace(); throw e; } } }
然後再寫個調用邏輯,用自己定義的註解
瀏覽器訪問,返回的不是我們剛剛定義的降級內容,其實這也挺好想的,我用的是之前的項目,之前在spring-cloud-api工程中定義了熔斷規則,改一下就好
將這此內容改下就好,還有配置文件隱藏下,這裡就不搞了
三、Hystrix的熔斷的原理以及請求代理的原理
當請求過來時,如果請求失敗,先判斷請求次數是否達到了最小請求次數,再判斷錯誤率是否達到了閾值,如果沒達到就繼續請求,這個錯誤率的統計時間默認是10S;如果達到了閾值就要打開斷路器,打開斷 路器後有5秒的時間是熔斷狀態,5秒後,如果有請求過來,就會試著把請求發送到遠程服務,如果成功,斷路器就關閉;如果失敗斷路器繼續開啟;這個流程就引出第一個概念,那就是滑動窗口
3.1、滑動窗口
在 hystrix 里,大量使用了 RxJava 這個響應式函數編程框架,滑動窗口的實現也是使用了 RxJava 框架。它其實就是一個 流量控制技術;竟然提到了滑動窗口,那就必須要提兩上東西,一個是計數器,另一個就是滑動窗口;為了更通俗的理解計數器和滑動窗口關係,就以一個例子說明;假如有一個場景:要做一個請求限制,限制要求一分鐘內最多只能有60個請求通過,這時最通用的做方就是用個計數器,計數一分鐘內請求的次數,在這一分鐘內每來一個請求計數器就加1;一分鐘過後進入下一個一分鐘時計數器就把計數歸零重新計數;所以說如果要限流判斷就只用判斷這一分鐘內的計數量就可以了,但這種做法在每個1分鐘的臨界值時是有問題的,問題是啥呢,假如說在0到58S時都沒有請求,但是突然在第59S時一下子來了60個請求,在60S時再來60個請求,這個時候發生的情況是在相鄰兩秒內一下子來了120個請求,此時因為59S在第一個時間段;60S在第二個時間段,所以沒有滿足觸發熔斷條件,這就導至了相鄰兩秒間的請求量過了閾值,系統很可能炸了,為此引出了另一個玩意,那就是滑動窗口;滑動窗口把一分鐘分成6個窗口,每個窗口是10S,紅色框代表可以滑動的滑動窗口,黑色的窗口代表10S的統計數值,第一個10S統計 完成後紅色滑動窗口會向前滑動一格,改成滑動窗口後他統計的就是紅色滑動窗口內的訪問量總和了
hystrix是通過滑動窗口統計的,他一共有10個窗口,每個窗口代表1S,所以他統計的是他10S內的數據
上圖的每個小矩形代表一個桶,可以看到,每個桶都記錄著1秒內的四個指標數據:成功量、失敗量、超時量和拒絕量,這裡的拒絕量指的就是上面流程圖中【訊號量/執行緒池資源檢查】中被拒絕的流量。10個桶合起來是一個完整的滑動窗口,所以計算一個滑動窗口的總數據需要將10個桶的數據加起來。
四、Hystrix熔斷的源碼分析
Hystrix熔斷的@HystrixCommand註解,是通過HystrixCommandAspect這個切面來處理的。其中關注@Around註解聲明的方法,它針對於請求合併,以及降級的註解進行代理。這裡重點針對HystrixCommand這個註解進行詳細分析。
- getMethodFromTarget 獲取目標方法資訊
- MetaHolder metaHolder = metaHolderFactory.create(joinPoint); 獲取元數據,比如調用方法,HystrixProperty註解數據、方法參數等
- HystrixCommandFactory.getInstance().create 獲取調用者,它持有一個命令對象,並且可以在合適的時候通過這個命令對象完成具體的業務邏輯
- execute,執行命令
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); //省略程式碼... MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //如果是非同步,則創建GenericObservableCommand, 否則,則創建GenericCommand HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { //是否是響應式的(由於我們這些都是同步的會走 這個邏輯)
//默認是走這裡面,用命令執行器去執行 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
點擊進入 CommandExecutor類的execute方法,這個方法主要用來執行命令,從程式碼中可以看出這裡有三個執行類型,分別是同步、非同步、以及響應式。其中,響應式又分為Cold Observable(observable.toObservable()) 和 HotObservable(observable.observe())
默認的executionType=SYNCHRONOUS ,同步請求。
- execute():同步執行,返回一個單一的對象結果,發生錯誤時拋出異常。
- queue():非同步執行,返回一個 Future 對象,包含著執行結束後返回的單一結果。
- observe():這個方法返回一個 Observable 對象,它代表操作的多個結果,但是已經被訂閱者消費掉了。
- toObservable():這個方法返回一個 Observable 對象,它代表操作的多個結果,需要咱們自己手動訂閱並消費掉。
需要注意的是,Hystrix用到了RxJava這個框架,它是一個響應式編程框架,在Android裡面用得比較多
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) {
case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }
因為是走默認的,所以進入HystrixCommand類的execute()方法;這個方法中,首先調用queue(),這個方法會返回一個future對象。
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
queue這個方法中,返回了一個Future對象,這個future對象的實現是f,f是以匿名內部類,它是Java.util.concurrent中定一個的一個非同步帶返回值對象。當調用queue().get()方法時,最終是委派給了delegate.get 方法。
public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCa ncel().get()) { /* * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked. * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption, * than that interruption request cannot be taken back. */ interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); }
//最終會調用此方法 @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; /* special handling of error states that throw immediately */ if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) { return f; } else if (t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) { case COMMAND_EXCEPTION: case TIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errors return f; default: // these are errors we throw from queue() as they as rejection type errors throw hre; } } else { throw Exceptions.sneakyThrow(t); } } } return f; }
因為最終是委派給了delegate.get 方法執行,而delegate在開頭final Future<R> delegate = toObservable().toBlocking().toFuture();中,所以進入toObservable()方法中,在RxJava中,分為幾種角色
- Observable(被觀察者),它的主要作用是產生事件
- Observer(觀察者),它的作用是接收事件並作出相應
- Subscribe(訂閱),它用來連接被觀察者和觀察者
- Event(事件),被觀察者、觀察者、溝通的載體
在queue中,調用toObservable()方法創建一個被觀察者。通過Observable定義一個被觀察者,這個被觀察者會被toObservable().toBlocking().toFuture() ,實際上就是返回可獲得 run() 抽象方法執行結果的Future 。 run() 方法由子類實現,執行正常的業務邏輯。在下面這段程式碼中,當存在subscriber時,便會調用Func0#call() 方法,而這個subscriber是在 toBlocking() 中被訂閱的。
- 調用 isRequestCachingEnabled(); 判斷請求結果快取功能是否開啟,如果開啟並且命中了快取,則會以Observable形式返回一個快取結果
- 創建執行命令的Observable: hystrixObservable
- 當快取處於開啟狀態並且沒有命中快取時,則創建一個「訂閱了執行命令的Observable」:HystrixCommandResponseFromCache
- 創建存儲到快取的Observable: HystrixCachedObservable
- 將toCache添加到快取中,返回獲取快取的Observable:fromCache
- 如果添加失敗: fromCache!=null, 則調用 toCache.unsubscribe() 方法,取消HystrixCachedObservable 的訂閱
- 如果添加成功,則調用 toCache.toObservable(); 獲得快取Observable
- 當快取特性沒有開啟時,則返回執行命令的Observable。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(true); //user code did run } } }; final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call(R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); return afterFirstApplication; } } }; final Action0 fireOnCompletedHook = new Action0() { @Override public void call() { try { executionHook.onSuccess(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); } } }; return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* this is a stateful object so can only be used once */ /* CAS保證命令只執行一次 */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } // 命令開始時間戳 commandStartTimestamp = System.currentTimeMillis(); // 列印日誌 if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 快取開關,快取KEY(這個是Hystrix中請求快取功能,hystrix支援將一個請求結果快取起 來,下一個具有相同key的請求將直接從快取中取出結果,減少請求開銷) final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* try from cache first */ if (requestCacheEnabled) {//如果開啟了快取機制,則從快取中獲取結果 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 聲明執行命令的Observable Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; //保存請求結果到快取中 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });
執行命令的Observable的定義如下,通過defer定義了一個 applyHystrixSemantics 的事件。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { // 當commandState處於UNSUBSCRIBED時,不執行命令 if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } //返回執行命令的Observable return applyHystrixSemantics(_cmd); } }; Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks);
applyHystrixSemantics方法;假設快取特性未開啟或者未命中快取,那麼程式碼將執行 applyHystrixSemantics 。
- 傳入的_cmd是一個GenericCommand,最終執行這個command中的run方法,本質就是完成對queryOrder方法的代理
- circuitBreaker.allowRequest() 如果為true,表示當前不處於熔斷狀態,正常執行,否則,調用 handleShortCircuitViaFallback 實現服務降級,如果我們配置了fallback方法,則會獲得我們配置的fallback執行
- 如果當前hystrix處於未熔斷狀態,則
-
- getExecutionSemaphore 判斷當前策略是否為訊號量(TryableSemaphoreNoOp/TryableSemaphoreActual),如果是,則調用 tryAcquire 來獲取訊號量。如果當前訊號量滿了,則調用 handleSemaphoreRejectionViaFallback 方法。
- 調用 executeCommandAndObserve 獲取命令執行Observable。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//跟進 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
executeCommandAndObserve