[享學Netflix] 二十、Hystrix跨線程傳遞數據解決方案:HystrixRequestContext

  • 2020 年 3 月 18 日
  • 筆記

不要老想着做不順就放棄,哪個團隊都有問題,哪個團隊都有優點 代碼下載地址:https://github.com/f641385712/netflix-learning

目錄

前言

說到線程間的數據傳遞,你肯定會聯想到ThreadLocal。但若是跨線程傳遞數據呢?閱讀本文之前,我個人建議你已經能夠非常熟練的使用ThreadLocal並了解其基本原理,至少看過下面兩篇文章表述的內容:

Hystrix里,它支持兩種隔離模式:線程池隔離和信號量隔離。前者是默認選項:每個命令都在線程池裡隔離執行,因此必然會涉及到存在跨線程傳遞數據的問題,這是Hystrix需要解決的(信號量隔離不存在此問題~)。


正文

關於線程內、線程間傳遞數據的進階三部曲:ThreadLocal -> InheritableThreadLocal -> TransmittableThreadLocal,提供的能力越來越強。強兩者由JDK源生提供,最多能支持到父線程向子線程傳遞數據,但無法解決線程池執行問題。

TransmittableThreadLocal是阿里巴巴開源的TTL工具,專用於解決線程間數據傳遞問題,支持線程池。但是很顯然,Hystrix不可能依賴於阿里巴巴的實現(阿里的年紀比它還小呢),所以它擁有自己的實現方式,核心API為:HystrixRequestContext/HystrixRequestVariableDefault/HystrixContextRunnable


HystrixRequestContext

它是請求級別的上下文,也就是說同一個請求內,前面放進去的內容,只要在請求生命周期內,任何地方均可以獲取到。

說明:這裡所指的請求級別,你可以理解為一個Http請求一樣,只是針對一個請求:請求內的任務可用多個線程或者使用線程池去處理,所以需要解決線程池內的數據獲取問題

public class HystrixRequestContext implements Closeable {    	// 利用ThreadLocal, 每個線程各有一份HystrixRequestContext  	// 當然,前提是調用了initializeContext()進行初始化  	private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<>();      public static HystrixRequestContext initializeContext() {          HystrixRequestContext state = new HystrixRequestContext();          requestVariables.set(state); // 每個線程都持有一份當前上下文state          return state;      }      // 當然嘍,它也允許你自己重新設置綁定      // 為當前線程設置一個已存在的HystrixRequestContext      public static void setContextOnCurrentThread(HystrixRequestContext state) {          requestVariables.set(state);      }      	// 得到當前線程的Hystrix請求上下文  從ThreadLocal里拿      public static HystrixRequestContext getContextForCurrentThread() {          HystrixRequestContext context = requestVariables.get();          if (context != null && context.state != null) {              return context;          } else {              return null;          }      }  	// 判斷當前線程的上下文是否已經初始化~~~~ 就是判斷get() == null而已嘛      public static boolean isCurrentThreadInitialized() {          HystrixRequestContext context = requestVariables.get();          return context != null && context.state != null;      }  }

這段代碼看似做的不多:給當前線程綁定一個HystrixRequestContext上下文對象,當然嘍,這個上下文對象可以init初始化,也可來自於一個已經存在的上下文

如何理解已經存在的上下文,比如父線程已經存在一個上下文,子線程想綁定時,是可以使用父上下文的,這樣父子不就共用一個上下文了麽?這樣數據就打通了嘛~


使用示例一:

@Test  public void fun1() {      HystrixRequestContext.initializeContext();    	... // 處理你的業務邏輯        HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();        System.out.println(contextForCurrentThread.getClass());      // contextForCurrentThread.close();      contextForCurrentThread.shutdown();  }

運行程序控制台打印:

class com.netflix.hystrix.strategy.concurrency.HystrixRequestContext

是的,單獨使用HystrixRequestContext的話,你就只能這麼簡單用啦。若配上Servlet的Filter,會更有意義些:

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {      HystrixRequestContext context = HystrixRequestContext.initializeContext();      try {           chain.doFilter(request, response);      } finally {           context.shutdown();      }  }

官方特彆強調:如果你調用了initializeContext()方法,請務必確保shutdown()方法也會被調用,否則會造成可能的內存泄漏。

這樣子在Http請求的任意地方,便可輕鬆的拿到當前線程的HystrixRequestContext對象,從而獲取相應數據。

但是,請繼續看如下示例:

@Test  public void fun2() throws InterruptedException {      HystrixRequestContext.initializeContext();          // 啟動子線程完成具體業務邏輯      new Thread(() -> {          // 子線程里需要拿到請求上下文處理邏輯          HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();          // ... // 處理業務邏輯          System.out.println("當前Hystrix請求上下文是:" + contextForCurrentThread);      }).start();        HystrixRequestContext.getContextForCurrentThread().close();      TimeUnit.SECONDS.sleep(1);  }

控制台打印:

當前Hystrix請求上下文是:null

我相信你在看了文首推薦的兩篇文章後,對這個結果並不會感到驚訝。有的人會說使用InheritableThreadLocal能解決向子線程傳遞數據的問題,那麼問題是如果異步任務交給線程池執行呢?難道你還得藉助阿里巴巴的TTL來實現嗎?


提出疑問

我們知道Hystrix默認的隔離方式是線程池:每個command/方法均會在線程池內執行,所以傳遞數據使用InheritableThreadLocal是不能從根本上解決問題的,當然若你說藉助阿里巴巴的TTL是可以解決問題,但讓堂堂Hystrix依賴阿里巴巴的庫,是不是也太不實際了呢?況且Hystrix的年齡可比TTL大多了~

因此,對於以上示例至少可以提出以下疑問:

  1. 光禿禿的在線程內傳遞一個HystrixRequestContext貌似沒有任何意義,上下文裏面可以裝數據嗎?
  2. 對於子線程、線程池內獲取上下文HystrixRequestContext,Hystrix是如何解決的???

HystrixRequestContext的Javadoc里有說,它還負責一件事:管理着HystrixRequestVariable的生命周期(存儲和銷毀),而一個HystrixRequestVariable接口便代表着請求變量:請求本地變量。


HystrixRequestVariable

它是一個接口,用於存儲一個變量,和上下文相關。

第一個問題:HystrixRequestContext能裝東西嗎?那是必然呢,不然怎麼好意思叫上下文呢?

HystrixRequestContext:    	// 上下文所有的數據都使用這個Map來存儲  	// state的訪問權限是default級別:只能同包訪問  	ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<>();

這是實際的存儲結構。每個線程綁定了一個HystrixRequestContext,而每個HystrixRequestContext有個Map結構存儲數據,key就HystrixRequestVariableDefault

HystrixRequestContext並沒有提供直接訪問state的相關方法,因此字段並非private,它建議外部直接使用字段進行訪問(添加數據、移除數據等)。而state字段的唯一訪問處是HystrixRequestVariableDefault

// 它的效果特別像ThreadLocal:用於傳遞請求級別的數據  // HystrixRequestVariableLifecycle方法:initialValue/shutdown(value)  public interface HystrixRequestVariable<T> extends HystrixRequestVariableLifecycle<T> {    	// 獲取**當前請求**的值  	public T get();  }

它有一個默認實現HystrixRequestVariableDefault(可以認為是唯一實現)。


HystrixRequestVariableDefault

它用於儲存用戶請求級別的變量數據,而實際存儲依賴於HystrixRequestContext.state,和請求上下文實例綁定,從而和線程綁定。

public class HystrixRequestVariableDefault<T> implements HystrixRequestVariable<T> {    	// 獲取和當前請求(注意:不是當前線程哦,因為自己可能是子線程或者)  	public T get() {  		... // 必須確保HystrixRequestContext#init工作已做 否則拋出異常  		ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;  		LazyInitializer<?> v = variableMap.get(this);          if (v != null) {              return (T) v.get();          }  		...  	}  	// 初始值默認是null      public T initialValue() {          return null;      }    	// 設置數據:直接調用put方法即可。使用new一個新的LazyInitializer裝起來      public void set(T value) {          HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));      }      public void remove() { ... }  	...  }

它看起來和ThreadLocal效果類似,但是他倆有如下明顯區別:

  1. 使用它之前,必須調用HystrixRequestContext#initializeContext完成初始化才行(原因是HystrixRequestContext.state才是底層存儲)
  2. 它的清除動作是交給HystrixRequestContext#shutdown完成,所以請求結束後請你務必調用此方法

另外,它也說了,要想達到向子線程、線程池都可以傳遞數據的效果,你得使用Hystrix包裝後的HystrixContextCallable/HystrixContextRunnable去初始化任務,才能達到預期的傳播效果。


HystrixContextRunnable

Hystrix 的思路是包裝Runnable,在執行實際的任務之前,先拿當前線程HystrixRequestContext初始化實際執行任務的線程HystrixRequestContext,這樣達到了同一個HystrixRequestContext向下傳遞的目的了。

public class HystrixContextRunnable implements Runnable {        private final Callable<Void> actual;      // 父線程的上下文      private final HystrixRequestContext parentThreadState;        public HystrixContextRunnable(Runnable actual) {          this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);      }  	// 這裡使用了HystrixConcurrencyStrategy,它是一個SPI哦  	// 你還可以通過該SPI,自定義你的執行機制,非常靠譜有木有      public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {          this.actual = concurrencyStrategy.wrapCallable(() -> {          	actual.run(); // 實際執行的任務          	return null;          });          // 父線程奶你**構造時**所處在的線程          this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();      }          @Override      public void run() {          HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();          try {              HystrixRequestContext.setContextOnCurrentThread(parentThreadState);                try {                  actual.call();              } catch (Exception e) {                  throw new RuntimeException(e);              }            } finally {              HystrixRequestContext.setContextOnCurrentThread(existingState);          }      }  }

處理思想和阿里巴巴的TTL如出一轍:執行前把父上下文設置進去,目標任務執行完成後,釋放父上下文

另外:它還有個HystrixContextCallable用於包裝Callable,邏輯一毛一樣。


使用示例二:

@Test  public void fun3() throws InterruptedException {      HystrixRequestContext.initializeContext();      HystrixRequestContext mainContext = HystrixRequestContext.getContextForCurrentThread();      // 設置變量:讓其支持傳遞到子線程 or 線程池      NAME_VARIABLE.set("YoutBatman");        // 子線程的Runnable任務,必須使用`HystrixContextRunnable`才能得到上面設置的值哦      new Thread(new HystrixContextRunnable(() -> {          HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread();          System.out.println(contextForCurrentThread == mainContext);          System.out.println("當前線程綁定的變量值是:" + NAME_VARIABLE.get());      })).start();    	TimeUnit.SECONDS.sleep(1);      HystrixRequestContext.getContextForCurrentThread().close();    }

運行程序,控制台輸出:

true  當前線程綁定的變量值是:YoutBatman

請注意:這塊代碼並沒有顯示的將 YourBatman 從 main線程傳遞到子線程,也沒有利用InheritableThreadLocal哦。它的執行步驟可描述如下:

  1. main初始化HystrixRequestContext,並且和此Context上下文完成綁定
  2. NAME_VARIABLE.set("YoutBatman")設置變量值,請注意:此變量值是處在HystrixRequestContext這個上下文里的哦,屬於main線程的內容
  3. main線程初始化任務:使用的HystrixContextRunnable,所以該任務是和main線程的上下文綁定的
  4. 執行任務時,先用main線程的Context來初始化上下文(所以它綁定的上下文和main線程的是同一個上下文
  5. 任務里使用NAME_VARIABLE.get()實際上是從main線程的上下文里拿數據,那必然可以取到呀

這就能解釋了:為何子線程(甚至是線程池裡的線程)都能拿到父線程里設置的變量了,因為是共用的同一個context上下文嘛。


HystrixConcurrencyStrategy通用方案解決跨線程傳值

HystrixConcurrencyStrategy:    	// 給我們一個機會:裝飾callable執行      public <T> Callable<T> wrapCallable(Callable<T> callable) {          return callable;      }

也就是說我們可以自定義該插件的實現,可以在目標任務執行前後加入自己的邏輯。下面舉一個常見例子,獲取是你可能遇到的坑

DemoController:    	@GetMapping("/demo")  	public String getDemo() {  		// 簡單的說:就是向Request域里放一個值  	    RequestContextHolder.currentRequestAttributes().setAttribute("name", "YourBatman", SCOPE_REQUEST);  	    return demoService.getInfo();  	}    DemoService:    	public String getInfo() {  		Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);  		return name;  	}

本來這一切是可以正常work的,但是如果現在突然說DemoService#getInfo()要加入Hystrix熔斷降級:

DemoService:    	@HystrixCommand( ... )  	public String getInfo() {  		Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST);  		return name;  	}

再次運行,what a fuck,返回null,直接導致邏輯錯誤,這非常致命。如果只有一個接口到時簡單,你可以採用方法參數傳遞方式fix,但若有多個呢???下面接針對性的介紹一種通用解決方案:自定義HystrixConcurrencyStrategy

/**   * 此類能夠保證:RequestContext請求上下文,也就是RequestAttributes能夠在線程池裡自動有效   */  public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {        // 這個時候還在主線程了,所以通過RequestContextHolder.getRequestAttributes()是能拿到上下文的      // 拿到後hold住,等到run執行的時候再綁定即可      @Override      public <T> Callable<T> wrapCallable(Callable<T> callable) {          return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes());      }        static class RequestAttributeAwareCallable<T> implements Callable<T> {            private final Callable<T> delegate;          private final RequestAttributes requestAttributes;            public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {              this.delegate = callable;              this.requestAttributes = requestAttributes;          }            // 執行之前綁定上下文,執行完成後釋放          @Override          public T call() throws Exception {              try {                  RequestContextHolder.setRequestAttributes(requestAttributes);                  return delegate.call();              } finally {                  RequestContextHolder.resetRequestAttributes();              }          }      }  }

使用API方式註冊此插件:

HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());

你也可以採用配置的方式,寫在plugin.properties/config.properties里:

hystrix.plugin.HystrixConcurrencyStrategy.implementation=com.yourbatman.hystrix.RequestContextHystrixConcurrencyStrategy

這樣做後,媽媽就再也不用擔心你獲取不到請求上下文啦。另外本處只是以RequestContext為例,當然還有像MDC傳值MDC.setContextMap(contextMap)等等方案都是一樣的做。


總結

本文介紹了Netflix Hystrix它自己的解決跨線程傳遞數據的結局方式,並且也介紹了HystrixConcurrencyStrategy的擴展使用方式,該接口的擴展使用在Spring Cloud里也會有所體現。

總的來說,這篇文章內容和ThreadLocal的思想是相同的,或者說和阿里巴巴的TTL更是相似。Hystrix的隔離方式默認是線程池方式,所以加熔斷後是有何能影響你的正常邏輯的,請務必小心謹慎使用,特別在全鏈路追蹤時,很可能某些鏈路就失效了,影響你的全鏈路壓測邏輯