使用Hystrix的插件機制,解決在使用執行緒隔離時,threadlocal的傳遞問題
背景
在我們的項目中,比較廣泛地使用了ThreadLocal,比如,在filter層,根據token,取到用戶資訊後,就會放到一個ThreadLocal變數中;在後續的業務處理中,就會直接從當前執行緒,來獲取該ThreadLocal變數,然後獲取到其中的用戶資訊,非常的方便。
但是,hystrix 這個組件一旦引入的話,如果使用執行緒隔離的方式,我們的業務邏輯就被分成了兩部分,如下:
public class SimpleHystrixCommand extends HystrixCommand<String> {
private TestService testService;
public SimpleHystrixCommand(TestService testService) {
super(setter());
this.testService = testService;
}
@Override
protected String run() throws Exception {
....
}
...
}
首先,我們定義了一個Command,這個Command,最終就會丟給hystrix的執行緒池中去運行。那,我們的controller層,會怎麼寫呢?
@RequestMapping("/")
public String hystrixOrder () {
SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();
// 1
SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);
// 2
String res = simpleHystrixCommand.execute();
return res;
}
- 上面的1處,new了一個HystrixCommand,這一步,還是在當前執行緒執行的;
- 2處,在執行execute的過程中,最終就會把這個command,丟到執行緒池中,然後,command中的業務邏輯,就在執行緒池的執行緒中執行了。
所以,這中間,是有執行緒切換的,執行1時,當前執行緒里的ThreadLocal數據,在執行業務方法的時候,執行緒變了,也就取不到ThreadLocal數據了。
思路及實現
源碼
如果沒時間,可以直接看源碼:
//gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo
從setter入手
一開始,我的思路是,看看能不能把hystrix的默認執行緒池給換掉,因為構建HystrixCommand時,支援使用Setter的方式去配置。
如下:
com.netflix.hystrix.HystrixCommand.Setter
final public static class Setter {
// 1
protected final HystrixCommandGroupKey groupKey;
// 2
protected HystrixCommandKey commandKey;
// 3
protected HystrixThreadPoolKey threadPoolKey;
// 4
protected HystrixCommandProperties.Setter commandPropertiesDefaults;
// 5
protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults;
}
-
1處,設置命令組
-
2處,設置命令的key
-
3處,設置執行緒池的key;hystrix會根據這個key,在一個map中,來查找對應的執行緒池,如果找不到,則創建一個,並放到map中。
com.netflix.hystrix.HystrixThreadPool.Factory final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
-
4處,命令的相關屬性,包括是否降級,是否熔斷,是否允許請求合併,命令執行的最大超時時長,以及metric等實時統計資訊
-
5處,執行緒池的相關屬性,比如核心執行緒數,最大執行緒數,隊列長度等
怎麼樣,可以設置的屬性很多,是吧,但是,並沒有讓我們控制執行緒池的創建相關的,也沒辦法替換其默認執行緒池。
ok,那不用setter的方式,行不行呢?
從構造器入手
HystrixCommand 的構造函數,看看能不能傳入自定義的執行緒池呢?
經過我一開始不仔細的觀察,發現有一個構造函數可以傳入HystrixThreadPool,ok,就是它了。但是,後面仔細一看,竟然是 package許可權,我的子類,和HystrixCommand當然不是一個package下的,所以,訪問不了這個構造器。
雖然,可以使用反射,但是,咱們還是守規矩點好了,再看看有沒有其他入口。
尋找擴展口
仔細觀察下,看看執行緒池什麼時候創建的?
入口在下圖,每次new一個HystrixCommand,最終都會調用父類的構造函數:
上圖所示處,initThreadPool裡面,會去創建執行緒池,需要注意的是,這裡的第一個實參,threadPool,是構造函數的第5個形參,目前來看,傳進來的都是null。為啥說這個,我們接著看:
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
//1 get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
上面我們說了,第一個實參,總是null,所以,會走這裡的1處。
com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
//1 this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
//2 if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
// 3
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
- 1處,會查找快取,就是前面說的,去map中,根據執行緒池的key,查找對應的執行緒池
- 2處,沒找到,則進行創建
- 3處,new HystrixThreadPoolDefault,創建執行緒池
我們接著看3處:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
// 1
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
// 2
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
// 3
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
// 4
this.threadPool = this.metrics.getThreadPool();
...
}
-
1處,獲取執行緒池的默認配置,這個就和我們前面說的那個Setter里的類似
-
2處,從HystrixPlugins.getInstance()獲取一個HystrixConcurrencyStrategy類型的對象,保存到局部變數 concurrencyStrategy
-
3處,初始化metrics,這裡的第二個參數,是concurrencyStrategy.getThreadPool來獲取的,這個操作,實際上就會去創建執行緒池。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); ... final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); ... // 1 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
上面的1處,會去創建執行緒池。但是,這裡直接就是要了 jdk 的默認執行緒池類來創建,這還怎麼搞?類型都定死了。沒法擴展了。。。
發現hystrix的插件機制
但是,回過頭來,又仔細看了看,這個getThreadPool 是 HystrixConcurrencyStrategy類的一個方法,這個方法也是個實例方法。
方法不能改,那,實例能換嗎?再看看前面的程式碼:
ok,那接著分析:
public HystrixConcurrencyStrategy getConcurrencyStrategy() {
if (concurrencyStrategy.get() == null) {
//1 check for an implementation from Archaius first
Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);
concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);
}
return concurrencyStrategy.get();
}
1處,根據這個類,獲取實現,感覺有點戲。
private <T> T getPluginImplementation(Class<T> pluginClass) {
// 1
T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
if (p != null) return p;
// 2
return findService(pluginClass, classLoader);
}
-
1處,從一個動態屬性中獲取,後來經查,發現是如果集成了Netflix Archaius就可以動態獲取屬性,類似於一個配置中心
-
2處,如果前面沒找到,就是要 JDK 的SPI機制。
private static <T> T findService( Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader<T> sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }
那就好說了。SPI ,我們自定義一個實現,就可以替換掉默認的了,hystrix做的還是不錯,擴展性可以。
現在知道可以自定義HystrixConcurrencyStrategy了,那要怎麼自定義呢?
這個類,是個抽象類,大體有如下幾個方法:
getThreadPool
getBlockingQueue(int maxQueueSize)
Callable<T> wrapCallable(Callable<T> callable)
getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)
說是抽象類,但其實並沒有需要我們實現的方法,所有方法都有默認實現,我們只需要重寫需要覆蓋的方法即可。
我這裡,看重了第三個方法:
/**
* Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
* <p>
* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
* <p>
* <b>Default Implementation</b>
* <p>
* Pass-thru that does no wrapping.
*
* @param callable
* {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
* @return {@code Callable<T>} either as a pass-thru or wrapping the one given
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
方法注釋如上,我簡單說下,在執行前,提供一個機會,讓你去wrap這個callable,即最終要丟到執行緒池執行的那個callable。
我們可以wrap一下原有的callable,在執行前,把當前執行緒的threadlocal變數存下來,即為A,然後設置到callable裡面去;在callable執行的時候,就可以使用我們的A中的threadlocal來替換掉worker執行緒中的。
多說無益,這裡直接看程式碼:
// 0
public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
/**
* 1 獲取當前執行緒的threadlocalmap
*/
Object currentThreadlocalMap = getCurrentThreadlocalMap();
Callable<T> finalCallable = new Callable<T>() {
// 2
private Object callerThreadlocalMap = currentThreadlocalMap;
// 3
private Callable<T> targetCallable = callable;
@Override
public T call() throws Exception {
/**
* 4 將工作執行緒的原有執行緒變數保存起來
*/
Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap();
/**
*5 將本執行緒的執行緒變數,設置為caller的執行緒變數
*/
setCurrentThreadlocalMap(callerThreadlocalMap);
try {
// 6
return targetCallable.call();
}finally {
// 7
setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread);
log.info("restore work thread's threadlocal");
}
}
};
return finalCallable;
}
- 0處,自定義了一個類,繼承HystrixConcurrencyStrategy,準備覆蓋其默認的wrap方法
- 1處,獲取外部執行緒的threadlocal
- 2處,3處,這裡已經是處於匿名內部類了,定義了2個field,分別存放1中的外部執行緒的threadlocal,以及要wrap的callable
- 4處,此時已經處於run方法的執行邏輯了:保存worker執行緒的自身的執行緒局部變數
- 5處,使用外部執行緒的threadlocal覆蓋自身的
- 6處,調用真正的業務邏輯
- 7處,恢復為執行緒自身的threadlocal
獲取執行緒的threadlocal的程式碼:
private Object getCurrentThreadlocalMap() {
Thread thread = Thread.currentThread();
try {
Field field = Thread.class.getDeclaredField("threadLocals");
field.setAccessible(true);
Object o = field.get(thread);
return o;
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("{}",e);
}
return null;
}
設置執行緒的threadlocal的程式碼:
private void setCurrentThreadlocalMap(Object newThreadLocalMap) {
Thread thread = Thread.currentThread();
try {
Field field = Thread.class.getDeclaredField("threadLocals");
field.setAccessible(true);
field.set(thread,newThreadLocalMap);
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("{}",e);
}
}
插件機制的相關資料
//github.com/Netflix/Hystrix/wiki/Plugins
運行效果
controller程式碼
@RequestMapping("/")
public String hystrixOrder () {
// 1
SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();
// 2
SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);
String res = simpleHystrixCommand.execute();
return res;
}
-
1處,設置ThreadLocal變數
public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() { UserVO userVO = new UserVO(); userVO.setUserName("test user"); RequestContextHolder.set(userVO); log.info("set thread local:{} to context",userVO); return userVO; }
-
2處,new了一個HystrixCommand,然後execute執行
command中程式碼
public class SimpleHystrixCommand extends HystrixCommand<String> {
private TestService testService;
public SimpleHystrixCommand(TestService testService) {
super(setter());
this.testService = testService;
}
@Override
protected String run() throws Exception {
// 1
String s = testService.getResult();
log.info("get thread local:{}",s);
/**
* 如果睡眠時間,超過2s,會降級
* {@link #getFallback()}
*/
int millis = new Random().nextInt(3000);
log.info("will sleep {} millis",millis);
Thread.sleep(millis);
return s;
}
重點看1處程式碼:
public String getResult() {
UserVO userVO = RequestContextHolder.get();
log.info("I am hystrix pool thread,try to get threadlocal:{}",userVO);
return userVO.toString();
}
如上所示,會去獲取ThreadLocal變數,並列印。
spi配置
在resources\META-INF\services目錄下,創建文件:
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
內容為下面一行:
com.learn.hystrix.utils.MyHystrixConcurrencyStrategy
執行效果
2020-05-09 17:26:11.134 INFO 7452 --- [nio-8080-exec-2] com.learn.hystrix.utils.SessionUtils : set thread local:UserVO(userName=test user) to context
2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] com.learn.hystrix.service.TestService : I am hystrix pool thread,try to get threadlocal:UserVO(userName=test user)
2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : get thread local:UserVO(userName=test user)
2020-05-09 17:26:11.144 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : will sleep 126 millis
2020-05-09 17:26:11.281 INFO 7452 --- [x-member-pool-2] c.l.h.u.MyHystrixConcurrencyStrategy : restore work thread's threadlocal
可以看到,已經發生了執行緒切換,在worker執行緒也取到了。
大家如果發現日誌中出現了[ HystrixTimer-1] 執行緒的身影,不用擔心,那只是因為我們的執行緒超時了,所以timer執行緒檢測到了之後,去執行一個callable任務,那個runnable就是前面被我們包裝過的那個callable。(這塊超時的機制,todo吧,下次再講)
總結
hystrix的插件機制,不止可以擴展上面這一個類,還有幾個別的類也是可以的。大家直接參考:
//github.com/Netflix/Hystrix/wiki/Plugins
程式碼demo,我放在了:
//gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo