[享学Netflix] 二十、Hystrix跨线程传递数据解决方案:HystrixRequestContext
- 2020 年 3 月 18 日
- 笔记
不要老想着做不顺就放弃,哪个团队都有问题,哪个团队都有优点 代码下载地址:https://github.com/f641385712/netflix-learning
目录
前言
说到线程间的数据传递,你肯定会联想到ThreadLocal
。但若是跨线程传递数据呢?阅读本文之前,我个人建议你已经能够非常熟练的使用ThreadLocal
并了解其基本原理,至少看过下面两篇文章表述的内容:
在Hystrix
里,它支持两种隔离模式:线程池隔离和信号量隔离。前者是默认选项:每个命令都在线程池里隔离执行,因此必然会涉及到存在跨线程传递数据的问题,这是Hystrix需要解决的(信号量隔离不存在此问题~)。
正文
关于线程内、线程间传递数据的进阶三部曲:ThreadLocal -> InheritableThreadLocal -> TransmittableThreadLocal
,提供的能力越来越强。强两者由JDK源生提供,最多能支持到父线程向子线程传递数据,但无法解决线程池执行问题。
TransmittableThreadLocal
是阿里巴巴开源的TTL工具,专用于解决线程间数据传递问题,支持线程池。但是很显然,Hystrix不可能依赖于阿里巴巴的实现(阿里的年纪比它还小呢),所以它拥有自己的实现方式,核心API为:HystrixRequestContext/HystrixRequestVariableDefault/HystrixContextRunnable
。
HystrixRequestContext
它是请求级别的上下文,也就是说同一个请求内,前面放进去的内容,只要在请求生命周期内,任何地方均可以获取到。
说明:这里所指的请求级别,你可以理解为一个Http请求一样,只是针对一个请求:请求内的任务可用多个线程或者使用线程池去处理,所以需要解决线程池内的数据获取问题
public class HystrixRequestContext implements Closeable { // 利用ThreadLocal, 每个线程各有一份HystrixRequestContext // 当然,前提是调用了initializeContext()进行初始化 private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<>(); public static HystrixRequestContext initializeContext() { HystrixRequestContext state = new HystrixRequestContext(); requestVariables.set(state); // 每个线程都持有一份当前上下文state return state; } // 当然喽,它也允许你自己重新设置绑定 // 为当前线程设置一个已存在的HystrixRequestContext public static void setContextOnCurrentThread(HystrixRequestContext state) { requestVariables.set(state); } // 得到当前线程的Hystrix请求上下文 从ThreadLocal里拿 public static HystrixRequestContext getContextForCurrentThread() { HystrixRequestContext context = requestVariables.get(); if (context != null && context.state != null) { return context; } else { return null; } } // 判断当前线程的上下文是否已经初始化~~~~ 就是判断get() == null而已嘛 public static boolean isCurrentThreadInitialized() { HystrixRequestContext context = requestVariables.get(); return context != null && context.state != null; } }
这段代码看似做的不多:给当前线程绑定一个HystrixRequestContext
上下文对象,当然喽,这个上下文对象可以init初始化,也可来自于一个已经存在的上下文。
如何理解已经存在的上下文,比如父线程已经存在一个上下文,子线程想绑定时,是可以使用父上下文的,这样父子不就共用一个上下文了么?这样数据就打通了嘛~
使用示例一:
@Test public void fun1() { HystrixRequestContext.initializeContext(); ... // 处理你的业务逻辑 HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread(); System.out.println(contextForCurrentThread.getClass()); // contextForCurrentThread.close(); contextForCurrentThread.shutdown(); }
运行程序控制台打印:
class com.netflix.hystrix.strategy.concurrency.HystrixRequestContext
是的,单独使用HystrixRequestContext
的话,你就只能这么简单用啦。若配上Servlet的Filter,会更有意义些:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } }
官方特别强调:如果你调用了
initializeContext()
方法,请务必确保shutdown()
方法也会被调用,否则会造成可能的内存泄漏。
这样子在Http请求的任意地方,便可轻松的拿到当前线程的HystrixRequestContext
对象,从而获取相应数据。
但是,请继续看如下示例:
@Test public void fun2() throws InterruptedException { HystrixRequestContext.initializeContext(); // 启动子线程完成具体业务逻辑 new Thread(() -> { // 子线程里需要拿到请求上下文处理逻辑 HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread(); // ... // 处理业务逻辑 System.out.println("当前Hystrix请求上下文是:" + contextForCurrentThread); }).start(); HystrixRequestContext.getContextForCurrentThread().close(); TimeUnit.SECONDS.sleep(1); }
控制台打印:
当前Hystrix请求上下文是:null
我相信你在看了文首推荐的两篇文章后,对这个结果并不会感到惊讶。有的人会说使用InheritableThreadLocal
能解决向子线程传递数据的问题,那么问题是如果异步任务交给线程池执行呢?难道你还得借助阿里巴巴
的TTL来实现吗?
提出疑问
我们知道Hystrix
默认的隔离方式是线程池:每个command/方法均会在线程池内执行,所以传递数据使用InheritableThreadLocal
是不能从根本上解决问题的,当然若你说借助阿里巴巴
的TTL是可以解决问题,但让堂堂Hystrix依赖阿里巴巴的库,是不是也太不实际了呢?况且Hystrix的年龄可比TTL大多了~
因此,对于以上示例至少可以提出以下疑问:
- 光秃秃的在线程内传递一个
HystrixRequestContext
貌似没有任何意义,上下文里面可以装数据吗? - 对于子线程、线程池内获取上下文
HystrixRequestContext
,Hystrix是如何解决的???
HystrixRequestContext
的Javadoc里有说,它还负责一件事:管理着HystrixRequestVariable
的生命周期(存储和销毁),而一个HystrixRequestVariable
接口便代表着请求变量:请求本地变量。
HystrixRequestVariable
它是一个接口,用于存储一个变量,和上下文相关。
第一个问题:HystrixRequestContext
能装东西吗?那是必然呢,不然怎么好意思叫上下文呢?
HystrixRequestContext: // 上下文所有的数据都使用这个Map来存储 // state的访问权限是default级别:只能同包访问 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<>();
这是实际的存储结构。每个线程绑定了一个HystrixRequestContext
,而每个HystrixRequestContext
有个Map结构存储数据,key就HystrixRequestVariableDefault
。
HystrixRequestContext
并没有提供直接访问state
的相关方法,因此字段并非private,它建议外部直接使用字段进行访问(添加数据、移除数据等)。而state
字段的唯一访问处是HystrixRequestVariableDefault
。
// 它的效果特别像ThreadLocal:用于传递请求级别的数据 // HystrixRequestVariableLifecycle方法:initialValue/shutdown(value) public interface HystrixRequestVariable<T> extends HystrixRequestVariableLifecycle<T> { // 获取**当前请求**的值 public T get(); }
它有一个默认实现HystrixRequestVariableDefault
(可以认为是唯一实现)。
HystrixRequestVariableDefault
它用于储存用户请求级别的变量数据,而实际存储依赖于HystrixRequestContext.state
,和请求上下文实例绑定,从而和线程绑定。
public class HystrixRequestVariableDefault<T> implements HystrixRequestVariable<T> { // 获取和当前请求(注意:不是当前线程哦,因为自己可能是子线程或者) public T get() { ... // 必须确保HystrixRequestContext#init工作已做 否则抛出异常 ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state; LazyInitializer<?> v = variableMap.get(this); if (v != null) { return (T) v.get(); } ... } // 初始值默认是null public T initialValue() { return null; } // 设置数据:直接调用put方法即可。使用new一个新的LazyInitializer装起来 public void set(T value) { HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value)); } public void remove() { ... } ... }
它看起来和ThreadLocal
效果类似,但是他俩有如下明显区别:
- 使用它之前,必须调用
HystrixRequestContext#initializeContext
完成初始化才行(原因是HystrixRequestContext.state
才是底层存储) - 它的清除动作是交给
HystrixRequestContext#shutdown
完成,所以请求结束后请你务必调用此方法
另外,它也说了,要想达到向子线程、线程池都可以传递数据的效果,你得使用Hystrix包装后的HystrixContextCallable/HystrixContextRunnable
去初始化任务,才能达到预期的传播效果。
HystrixContextRunnable
Hystrix 的思路是包装Runnable,在执行实际的任务之前,先拿当前线程的HystrixRequestContext
初始化实际执行任务的线程的HystrixRequestContext
,这样达到了同一个HystrixRequestContext
向下传递的目的了。
public class HystrixContextRunnable implements Runnable { private final Callable<Void> actual; // 父线程的上下文 private final HystrixRequestContext parentThreadState; public HystrixContextRunnable(Runnable actual) { this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual); } // 这里使用了HystrixConcurrencyStrategy,它是一个SPI哦 // 你还可以通过该SPI,自定义你的执行机制,非常靠谱有木有 public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { this.actual = concurrencyStrategy.wrapCallable(() -> { actual.run(); // 实际执行的任务 return null; }); // 父线程奶你**构造时**所处在的线程 this.parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } @Override public void run() { HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { HystrixRequestContext.setContextOnCurrentThread(parentThreadState); try { actual.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { HystrixRequestContext.setContextOnCurrentThread(existingState); } } }
处理思想和阿里巴巴的TTL如出一辙:执行前把父上下文设置进去,目标任务执行完成后,释放父上下文。
另外:它还有个
HystrixContextCallable
用于包装Callable,逻辑一毛一样。
使用示例二:
@Test public void fun3() throws InterruptedException { HystrixRequestContext.initializeContext(); HystrixRequestContext mainContext = HystrixRequestContext.getContextForCurrentThread(); // 设置变量:让其支持传递到子线程 or 线程池 NAME_VARIABLE.set("YoutBatman"); // 子线程的Runnable任务,必须使用`HystrixContextRunnable`才能得到上面设置的值哦 new Thread(new HystrixContextRunnable(() -> { HystrixRequestContext contextForCurrentThread = HystrixRequestContext.getContextForCurrentThread(); System.out.println(contextForCurrentThread == mainContext); System.out.println("当前线程绑定的变量值是:" + NAME_VARIABLE.get()); })).start(); TimeUnit.SECONDS.sleep(1); HystrixRequestContext.getContextForCurrentThread().close(); }
运行程序,控制台输出:
true 当前线程绑定的变量值是:YoutBatman
请注意:这块代码并没有显示的将 YourBatman 从 main线程传递到子线程,也没有利用InheritableThreadLocal
哦。它的执行步骤可描述如下:
- main初始化
HystrixRequestContext
,并且和此Context上下文完成绑定 NAME_VARIABLE.set("YoutBatman")
设置变量值,请注意:此变量值是处在HystrixRequestContext
这个上下文里的哦,属于main线程的内容- main线程初始化任务:使用的
HystrixContextRunnable
,所以该任务是和main线程的上下文绑定的 - 执行任务时,先用main线程的Context来初始化上下文(所以它绑定的上下文和main线程的是同一个上下文)
- 任务里使用
NAME_VARIABLE.get()
实际上是从main线程的上下文里拿数据,那必然可以取到呀
这就能解释了:为何子线程(甚至是线程池里的线程)都能拿到父线程里设置的变量了,因为是共用的同一个context上下文嘛。
HystrixConcurrencyStrategy通用方案解决跨线程传值
HystrixConcurrencyStrategy: // 给我们一个机会:装饰callable执行 public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
也就是说我们可以自定义该插件的实现,可以在目标任务执行前后加入自己的逻辑。下面举一个常见例子,获取是你可能遇到的坑
DemoController: @GetMapping("/demo") public String getDemo() { // 简单的说:就是向Request域里放一个值 RequestContextHolder.currentRequestAttributes().setAttribute("name", "YourBatman", SCOPE_REQUEST); return demoService.getInfo(); } DemoService: public String getInfo() { Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST); return name; }
本来这一切是可以正常work的,但是如果现在突然说DemoService#getInfo()
要加入Hystrix熔断降级:
DemoService: @HystrixCommand( ... ) public String getInfo() { Srting name = RequestContextHolder.currentRequestAttributes().getAttribute("name", SCOPE_REQUEST); return name; }
再次运行,what a fuck,返回null,直接导致逻辑错误,这非常致命。如果只有一个接口到时简单,你可以采用方法参数传递方式fix,但若有多个呢???下面接针对性的介绍一种通用解决方案:自定义HystrixConcurrencyStrategy
/** * 此类能够保证:RequestContext请求上下文,也就是RequestAttributes能够在线程池里自动有效 */ public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { // 这个时候还在主线程了,所以通过RequestContextHolder.getRequestAttributes()是能拿到上下文的 // 拿到后hold住,等到run执行的时候再绑定即可 @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes()); } static class RequestAttributeAwareCallable<T> implements Callable<T> { private final Callable<T> delegate; private final RequestAttributes requestAttributes; public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) { this.delegate = callable; this.requestAttributes = requestAttributes; } // 执行之前绑定上下文,执行完成后释放 @Override public T call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); return delegate.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
使用API方式注册此插件:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());
你也可以采用配置的方式,写在plugin.properties/config.properties
里:
hystrix.plugin.HystrixConcurrencyStrategy.implementation=com.yourbatman.hystrix.RequestContextHystrixConcurrencyStrategy
这样做后,妈妈就再也不用担心你获取不到请求上下文啦。另外本处只是以RequestContext
为例,当然还有像MDC传值MDC.setContextMap(contextMap)
等等方案都是一样的做。
总结
本文介绍了Netflix Hystrix
它自己的解决跨线程传递数据的结局方式,并且也介绍了HystrixConcurrencyStrategy
的扩展使用方式,该接口的扩展使用在Spring Cloud里也会有所体现。
总的来说,这篇文章内容和ThreadLocal
的思想是相同的,或者说和阿里巴巴的TTL更是相似。Hystrix
的隔离方式默认是线程池方式,所以加熔断后是有何能影响你的正常逻辑的,请务必小心谨慎使用,特别在全链路追踪时,很可能某些链路就失效了,影响你的全链路压测逻辑…