通過transmittable-thread-local源碼理解執行緒池執行緒本地變數傳遞的原理
前提
最近一兩個月花了很大的功夫做UCloud
服務和中間件遷移到阿里雲的工作,沒什麼空閑時間擼文。想起很早之前寫過ThreadLocal
的源碼分析相關文章,裡面提到了ThreadLocal
存在一個不能向預先創建的執行緒中進行變數傳遞的局限性,剛好有一位HSBC
的技術大牛前同事提到了團隊引入了transmittable-thread-local解決了此問題。借著這個契機,順便clone
了transmittable-thread-local
源碼進行分析,這篇文章會把ThreadLocal
和InheritableThreadLocal
的局限性分析完畢,並且從一些基本原理以及設計模式的運用分析transmittable-thread-local
(下文簡稱為TTL
)整套框架的實現。
如果對執行緒池和ThreadLocal
不熟悉的話,可以先參看一下前置文章:
- 《JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析》
- 《JUC執行緒池ThreadPoolExecutor源碼分析》
- 《ThreadLocal源碼分析-黃金分割數的使用》
這篇文章前後花了兩周時間編寫,行文比價干硬,文字比較多(接近5W
字),希望帶著耐心閱讀。
父子執行緒的變數傳遞
在Java
中沒有明確給出一個API
可以基於子執行緒實例獲取其父執行緒實例,有一個相對可行的方案就是在創建子執行緒Thread
實例的時候獲取當前執行緒的實例,用到的API
是Thread#currentThread()
:
public class Thread implements Runnable {
// 省略其他程式碼
@HotSpotIntrinsicCandidate
public static native Thread currentThread();
// 省略其他程式碼
}
Thread#currentThread()
方法是一個靜態本地方法,它是由JVM
實現,這是在JDK
中唯一可以獲取父執行緒實例的API
。一般而言,如果想在子執行緒實例中得到它的父執行緒實例,那麼需要像如下這樣操作:
public class InheritableThread {
public static void main(String[] args) throws Exception{
// 父執行緒就是main執行緒
Thread parentThread = Thread.currentThread();
Thread childThread = new Thread(()-> {
System.out.println("Parent thread is:" + parentThread.getName());
},"childThread");
childThread.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
Parent thread is:main
類似地,如果我們想把一個父子執行緒共享的變數實例傳遞,也可以這樣做:
public class InheritableVars {
public static void main(String[] args) throws Exception {
// 父執行緒就是main執行緒
Thread parentThread = Thread.currentThread();
final Var var = new Var();
var.setValue1("var1");
var.setValue2("var2");
Thread childThread = new Thread(() -> {
System.out.println("Parent thread is:" + parentThread.getName());
methodFrame1(var);
}, "childThread");
childThread.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1(Var var) {
methodFrame2(var);
}
private static void methodFrame2(Var var) {
}
@Data
private static class Var {
private Object value1;
private Object value2;
}
}
這種做法其實是可行的,子執行緒調用的方法棧中的所有方法都必須顯示傳入需要從父執行緒傳遞過來的參數引用Var
實例,這樣就會產生硬編碼問題,既不靈活也導致方法不能復用,所以才衍生出執行緒本地變數Thread Local
,具體的實現有ThreadLocal
和InheritableThreadLocal
。它們兩者的基本原理是類似的,實際上所有的變數實例是快取在執行緒實例的變數ThreadLocal.ThreadLocalMap
中,執行緒本地變數實例都只是執行緒實例獲取ThreadLocal.ThreadLocalMap
的一道橋樑:
public class Thread implements Runnable {
// 省略其他程式碼
// KEY為ThreadLocal實例,VALUE為具體的值
ThreadLocal.ThreadLocalMap threadLocals = null;
// KEY為InheritableThreadLocal實例,VALUE為具體的值
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
// 省略其他程式碼
}
ThreadLocal
和InheritableThreadLocal
之間的區別可以結合源碼分析一下(見下一小節)。前面的分析聽起來如果覺得抽象的話,可以自己寫幾個類推敲一下,假如執行緒其實叫ThrowableThread
,而執行緒本地變數叫ThrowableThreadLocal
,那麼它們之間的關係如下:
public class Actor {
static ThrowableThreadLocal THREAD_LOCAL = new ThrowableThreadLocal();
public static void main(String[] args) throws Exception {
ThrowableThread throwableThread = new ThrowableThread() {
@Override
public void run() {
methodFrame1();
}
};
throwableThread.start();
}
private static void methodFrame1() {
THREAD_LOCAL.set("throwable");
methodFrame2();
}
private static void methodFrame2() {
System.out.println(THREAD_LOCAL.get());
}
/**
* 這個類暫且認為是java.lang.Thread
*/
private static class ThrowableThread implements Runnable {
ThrowableThreadLocal.ThrowableThreadLocalMap threadLocalMap;
@Override
public void run() {
}
// 這裡模擬VM的實現,返回ThrowableThread自身,大家先認為不是返回NULL
public static ThrowableThread getCurrentThread() {
// return new ThrowableThread();
return null; // <--- 假設這裡在VM的實現裡面返回的不是NULL而是當前的ThrowableThread
}
public void start() {
run();
}
}
private static class ThrowableThreadLocal {
public ThrowableThreadLocal() {
}
public void set(Object value) {
ThrowableThread currentThread = ThrowableThread.getCurrentThread();
assert null != currentThread;
ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
if (null == threadLocalMap) {
threadLocalMap = currentThread.threadLocalMap = new ThrowableThreadLocalMap();
}
threadLocalMap.put(this, value);
}
public Object get() {
ThrowableThread currentThread = ThrowableThread.getCurrentThread();
assert null != currentThread;
ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
if (null == threadLocalMap) {
return null;
}
return threadLocalMap.get(this);
}
// 這裡其實在ThreadLocal中用的是WeakHashMap
public static class ThrowableThreadLocalMap extends HashMap<ThrowableThreadLocal, Object> {
}
}
}
上面的程式碼不能運行,只是通過一個自定義的實現說明一下其中的原理和關係。
ThreadLocal和InheritableThreadLocal的局限性
InheritableThreadLocal
是ThreadLocal
的子類,它們之間的聯繫是:兩者都是執行緒Thread
實例獲取ThreadLocal.ThreadLocalMap
的一個中間變數。區別是:兩者控制ThreadLocal.ThreadLocalMap
創建的時機和通過Thread
實例獲取ThreadLocal.ThreadLocalMap
在Thread
實例中對應的屬性並不一樣,導致兩者的功能有一點差別。通俗來說兩者的功能聯繫和區別是:
ThreadLocal
:單個執行緒生命周期強綁定,只能在某個執行緒的生命周期內對ThreadLocal
進行存取,不能跨執行緒存取。
public class ThreadLocalMain {
private static ThreadLocal<String> TL = new ThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
methodFrame1();
}, "childThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
TL.set("throwable");
methodFrame2();
}
private static void methodFrame2() {
System.out.println(TL.get());
}
}
// 輸出結果:
throwable
InheritableThreadLocal
:(1)可以無感知替代ThreadLocal
的功能,當成ThreadLocal
使用。(2)明確父-子執行緒關係的前提下,繼承(拷貝)父執行緒的執行緒本地變數快取過的變數,而這個拷貝的時機是子執行緒Thread
實例化時候進行的,也就是子執行緒實例化完畢後已經完成了InheritableThreadLocal
變數的拷貝,這是一個變數傳遞的過程。
public class InheritableThreadLocalMain {
// 此處可以嘗試替換為ThreadLocal,最後會輸出null
static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
// 在父執行緒中設置變數
ITL.set("throwable");
new Thread(() -> {
methodFrame1();
}, "childThread").start();
}, "parentThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
methodFrame2();
}
private static void methodFrame2() {
System.out.println(ITL.get());
}
}
// 輸出結果:
throwable
上面提到的兩點可以具體參看ThreadLocal
、InheritableThreadLocal
和Thread
三個類的源碼,這裡筆者把一些必要的注釋和源碼段貼出:
// --> java.lang.Thread類的源碼片段
public class Thread implements Runnable {
// 省略其他程式碼
// 這是Thread最基本的構造函數
private Thread(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略其他程式碼
Thread parent = currentThread();
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
// inheritThreadLocals一般情況下為true
// 當前子執行緒實例拷貝父執行緒的inheritableThreadLocals屬性,創建一個新的ThreadLocal.ThreadLocalMap實例賦值到自身的inheritableThreadLocals屬性
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
this.tid = nextThreadID();
}
// 省略其他程式碼
}
// --> java.lang.ThreadLocal源碼片段
public class ThreadLocal<T> {
// 省略其他程式碼
public void set(T value) {
Thread t = Thread.currentThread();
// 通過當前執行緒獲取執行緒實例中的threadLocals
ThreadLocalMap map = getMap(t);
// 執行緒實例中的threadLocals為NULL,實例則創建一個ThreadLocal.ThreadLocalMap實例添加當前ThreadLocal->VALUE到ThreadLocalMap中,如果已經存在ThreadLocalMap則進行覆蓋對應的Entry
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
// 通過執行緒實例獲取該執行緒的threadLocals實例,其實是ThreadLocal.ThreadLocalMap類型的屬性
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
public T get() {
Thread t = Thread.currentThread();
// 通過當前執行緒獲取執行緒實例中的threadLocals,再獲取ThreadLocal.ThreadLocalMap中匹配上KEY為當前ThreadLocal實例的Entry對應的VALUE
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 找不到則嘗試初始化ThreadLocal.ThreadLocalMap
return setInitialValue();
}
// 如果不存在ThreadLocal.ThreadLocalMap,則通過初始化initialValue()方法的返回值,構造一個ThreadLocal.ThreadLocalMap
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
// 省略其他程式碼
}
// --> java.lang.InheritableThreadLocal源碼 - 太簡單,全量貼出
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
// 這個方法使用在執行緒Thread的構造函數裡面ThreadLocal.createInheritedMap(),基於父執行緒InheritableThreadLocal的屬性創建子執行緒的InheritableThreadLocal屬性,它的返回值決定了拷貝父執行緒的屬性時候傳入子執行緒的值
protected T childValue(T parentValue) {
return parentValue;
}
// 覆蓋獲取執行緒實例中的綁定的ThreadLocalMap為Thread#inheritableThreadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override註解
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
// 覆蓋創建ThreadLocalMap的邏輯,賦值到執行緒實例中的inheritableThreadLocals,而不是threadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override註解
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
一定要注意,這裡的setInitialValue()
方法很重要,一個新的執行緒Thread
實例在初始化(對於InheritableThreadLocal
而言繼承父執行緒的執行緒本地變數)或者是首次調用ThreadLocal#set()
,會通過此setInitialValue()
方法去構造一個全新的ThreadLocal.ThreadLocalMap
,會直接使用createMap()
方法。
以前面提到的兩個例子,貼一個圖加深理解:
Example-1
:
Example-2
:
ThreadLocal
、InheritableThreadLocal
的最大局限性就是:無法為預先創建好(未投入使用)的執行緒實例傳遞變數(準確來說是首次傳遞某些場景是可行的,而後面由於執行緒池中的執行緒是復用的,無法進行更新或者修改變數的傳遞值),泛執行緒池Executor
體系、TimerTask
和ForkJoinPool
等一般會預先創建(核心)執行緒,也就它們都是無法在執行緒池中由預創建的子執行緒執行的Runnable
任務實例中使用。例如下面的方式會導致參數傳遞失敗:
public class InheritableThreadForExecutor {
static final InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception {
ITL.set("throwable");
EXECUTOR.execute(() -> {
System.out.println(ITL.get());
});
ITL.set("doge");
EXECUTOR.execute(() -> {
System.out.println(ITL.get());
});
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
throwable
throwable # <--- 可見此處參數傳遞出現異常
首次變數傳遞成功是因為執行緒池中的所有子執行緒都是派生自main
執行緒。
TTL的簡單使用
TTL
的使用方式在它的項目README.md
或者項目中的單元測試有十分詳細的介紹,先引入依賴com.alibaba:transmittable-thread-local:2.11.4
,這裡演示一個例子:
// 父-子執行緒
public class TtlSample1 {
static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
// 在父執行緒中設置變數
TTL.set("throwable");
new Thread(TtlRunnable.get(() -> {
methodFrame1();
}), "childThread").start();
}, "parentThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
methodFrame2();
}
private static void methodFrame2() {
System.out.println(TTL.get());
}
}
// 輸出:
throwable
// 執行緒池
public class TtlSample2 {
static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception {
TTL.set("throwable");
EXECUTOR.execute(TtlRunnable.get(() -> {
System.out.println(TTL.get());
}));
TTL.set("doge");
EXECUTOR.execute(TtlRunnable.get(() -> {
System.out.println(TTL.get());
}));
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出:
throwable
doge
TTL實現的基本原理
TTL
設計上使用了大量的委託(Delegate
),委託是C#
裡面的說法,對標Java
的設計模式就是代理模式。舉個簡單的例子:
@Slf4j
public class StaticDelegate {
public static void main(String[] args) throws Exception {
new RunnableDelegate(() -> log.info("Hello World!")).run();
}
@Slf4j
@RequiredArgsConstructor
private static final class RunnableDelegate implements Runnable {
private final Runnable runnable;
@Override
public void run() {
try {
log.info("Before run...");
runnable.run();
log.info("After run...");
} finally {
log.info("Finally run...");
}
}
}
}
// 輸出結果:
23:45:27.763 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Before run...
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate - Hello World!
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - After run...
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Finally run...
委託如果使用純熟的話,可以做出很多十分有用的功能,例如可以基於Micrometer
去統計任務的執行時間,上報到Prometheus
,然後用Grafana
做監控和展示:
// 需要引入io.micrometer:micrometer-core:${version}
@Slf4j
public class MeterDelegate {
public static void main(String[] args) throws Exception {
Executor executor = Executors.newFixedThreadPool(1);
Runnable task = () -> {
try {
// 模擬耗時
Thread.sleep(1000);
} catch (Exception ignore) {
}
};
Map<String, String> tags = new HashMap<>(8);
tags.put("_class", "MeterDelegate");
executor.execute(new MicrometerDelegate(task, "test-task", tags));
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
@Slf4j
@RequiredArgsConstructor
private static final class MicrometerDelegate implements Runnable {
private final Runnable runnable;
private final String taskType;
private final Map<String, String> tags;
@Override
public void run() {
long start = System.currentTimeMillis();
try {
runnable.run();
} finally {
long end = System.currentTimeMillis();
List<Tag> tagsList = Lists.newArrayList();
Optional.ofNullable(tags).ifPresent(x -> x.forEach((k, v) -> {
tagsList.add(Tag.of(k, v));
}));
Metrics.summary(taskType, tagsList).record(end - start);
}
}
}
}
委託理論上只要不執行緒棧溢出,可以無限層級地包裝,有點像洋蔥的結構,原始的目標方法會被包裹在最裡面並且最後執行:
public static void main(String[] args) throws Exception {
Runnable target = () -> log.info("target");
Delegate level1 = new Delegate(target);
Delegate level2 = new Delegate(level1);
Delegate level3 = new Delegate(level2);
// ......
}
@RequiredArgsConstructor
static class Delegate implements Runnable{
private final Runnable runnable;
@Override
public void run() {
runnable.run();
}
}
當然,委託的層級越多,程式碼結構就會越複雜,不利於理解和維護。多層級委託這個洋蔥結構,再配合Java
反射API
剝離對具體方法調用的依賴,就是Java
中切面編程的普遍原理,spring-aop
就是這樣實現的。委託如果再結合Agent
和位元組碼增強(使用ASM
、Javassist
等),可以實現類載入時期替換對應的Runnable
、Callable
或者一般介面的實現,這樣就能無感知完成了增強功能。此外,TTL
中還使用了模板方法模式,如:
@Slf4j
public class TemplateMethod {
public static void main(String[] args) throws Exception {
Runnable runnable = () -> log.info("Hello World!");
Template template = new Template(runnable) {
@Override
protected void beforeExecute() {
log.info("BeforeExecute...");
}
@Override
protected void afterExecute() {
log.info("AfterExecute...");
}
};
template.run();
}
@RequiredArgsConstructor
static abstract class Template implements Runnable {
private final Runnable runnable;
protected void beforeExecute() {
}
@Override
public void run() {
beforeExecute();
runnable.run();
afterExecute();
}
protected void afterExecute() {
}
}
}
// 輸出結果:
00:25:32.862 [main] INFO club.throwable.juc.TemplateMethod - BeforeExecute...
00:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - Hello World!
00:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - AfterExecute...
分析了兩種設計模式,下面簡單理解一下TTL
實現的偽程式碼:
# TTL extends InheritableThreadLocal
# Holder of TTL -> InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> [? => NULL]
(1)創建一個全局的Holder,用於保存父執行緒(或者明確了父執行緒的子執行緒)的TTL對象,這裡注意,是TTL對象,Holder是當作Set使用
(2)(父)執行緒A中使用了TTL,則所有設置的變數會被TTL捕獲
(3)(子)執行緒B使用了TtlRunnable(Runnable的TTL實現,使用了前面提到的委託,像Callable的實現是TtlCallable),會重放所有存儲在TTL中的,來自於執行緒A的存儲變數
(4)執行緒B重放完畢後,清理執行緒B獨立產生的ThreadLocal變數,歸還變TTL的變數
主要就是這幾步,裡面的話術有點抽象,後面一節分析源碼的時候會詳細講解。
TTL的源碼分析
主要分析:
- 框架的骨架。
- 核心類
TransmittableThreadLocal
。 - 發射器
Transmitter
。 - 捕獲、重放和復原。
Agent
模組。
TTL框架骨架
TTL
是一個十分精悍的框架,它依賴少量的類實現了比較強大的功能,除了提供給用戶使用的API
,還提供了基於Agent
和位元組碼增強實現了無感知增強泛執行緒池對應類的功能,這一點是比較驚艷的。這裡先分析編程式的API
,再簡單分析Agent
部分的實現。筆者閱讀TTL
框架的時間是2020
年五一勞動節前後,當前的最新發行版本為2.11.4
。TTL
的項目結構很簡單:
- transmittable-thread-local
- com.alibaba.ttl
- spi SPI介面和一些實現
- threadpool 執行緒池增強,包括ThreadFactory和執行緒池的Wrapper等
- agent 執行緒池的Agent實現相關
最外層的包有一些Wrapper的實現和TTL
先看spi
包:
- spi
TtlAttachments
TtlAttachmentsDelegate
TtlEnhanced
TtlWrapper
TtlEnhanced
是TTL
的標識介面(空介面),標識具體的組件被TTL
增強:
public interface TtlEnhanced {
}
通過instanceof
關鍵字就可以判斷具體的實現是否TTL
增強過的組件。TtlWrapper
介面繼承自介面TtlEnhanced
,用於標記實現類可以解包裝獲得原始實例:
public interface TtlWrapper<T> extends TtlEnhanced {
// 返回解包裝實例,實際是就是原始實例
@NonNull
T unwrap();
}
TtlAttachments
介面也是繼承自介面TtlEnhanced
,用於為TTL
添加K-V
結構的附件,TtlAttachmentsDelegate
是其實現類,K-V
的存儲實際上是委託給ConcurrentHashMap
:
public interface TtlAttachments extends TtlEnhanced {
// 添加K-V附件
void setTtlAttachment(@NonNull String key, Object value);
// 通過KEY獲取值
<T> T getTtlAttachment(@NonNull String key);
// 標識自動包裝的KEY,Agent模式會使用自動包裝,這個時候會傳入一個附件的K-V,其中KEY就是KEY_IS_AUTO_WRAPPER
String KEY_IS_AUTO_WRAPPER = "ttl.is.auto.wrapper";
}
// TtlAttachmentsDelegate
public class TtlAttachmentsDelegate implements TtlAttachments {
private final ConcurrentMap<String, Object> attachments = new ConcurrentHashMap<String, Object>();
@Override
public void setTtlAttachment(@NonNull String key, Object value) {
attachments.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <T> T getTtlAttachment(@NonNull String key) {
return (T) attachments.get(key);
}
}
因為TTL
的實現覆蓋了泛執行緒池Executor
、ExecutorService
、ScheduledExecutorService
、ForkJoinPool
和TimerTask
(在TTL
中組件已經標記為過期,推薦使用ScheduledExecutorService
),範圍比較廣,短篇幅無法分析所有的源碼,而且它們的實現思路是基本一致的,筆者下文只會挑選Executor
的實現路線進行分析。
核心類TransmittableThreadLocal
TransmittableThreadLocal
是TTL
的核心類,TTL
框架就是用這個類來命名的。先看它的構造函數和關鍵屬性:
// 函數式介面,TTL拷貝器
@FunctionalInterface
public interface TtlCopier<T> {
// 拷貝父屬性
T copy(T parentValue);
}
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 日誌句柄,使用的不是SLF4J的介面,而是java.util.logging的實現
private static final Logger logger = Logger.getLogger(TransmittableThreadLocal.class.getName());
// 是否禁用忽略NULL值的語義
private final boolean disableIgnoreNullValueSemantics;
// 默認是false,也就是不禁用忽略NULL值的語義,也就是忽略NULL值,也就是默認的話,NULL值傳入不會覆蓋原來已經存在的值
public TransmittableThreadLocal() {
this(false);
}
// 可以通過手動設置,去覆蓋IgnoreNullValue的語義,如果設置為true,則是支援NULL值的設置,設置為true的時候,與ThreadLocal的語義一致
public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
}
// 先忽略其他程式碼
}
disableIgnoreNullValueSemantics
屬性相關可以查看Issue157,下文分析方法的時候也會說明具體的場景。TransmittableThreadLocal
繼承自InheritableThreadLocal
,本質就是ThreadLocal
,那它到底怎麼樣保證變數可以在執行緒池中的執行緒傳遞?接著分析其他所有方法:
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 拷貝器的拷貝方法實現
public T copy(T parentValue) {
return parentValue;
}
// 模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行前回調
protected void beforeExecute() {
}
// 模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行後回調
protected void afterExecute() {
}
// 獲取值,直接從InheritableThreadLocal#get()獲取
@Override
public final T get() {
T value = super.get();
// 如果值不為NULL 或者 禁用了忽略空值的語義(也就是和ThreadLocal語義一致),則重新添加TTL實例自身到存儲器
if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
return value;
}
@Override
public final void set(T value) {
// 如果不禁用忽略空值的語義,也就是需要忽略空值,並且設置的入參值為空,則做一次徹底的移除,包括從存儲器移除TTL自身實例,TTL(ThrealLocalMap)中也移除對應的值
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
// TTL(ThrealLocalMap)中設置對應的值
super.set(value);
// 添加TTL實例自身到存儲器
addThisToHolder();
}
}
// 從存儲器移除TTL自身實例,從TTL(ThrealLocalMap)中移除對應的值
@Override
public final void remove() {
removeThisFromHolder();
super.remove();
}
// 從TTL(ThrealLocalMap)中移除對應的值
private void superRemove() {
super.remove();
}
// 拷貝值,主要是拷貝get()的返回值
private T copyValue() {
return copy(get());
}
// 存儲器,本身就是一個InheritableThreadLocal(ThreadLocal)
// 它的存放對象是WeakHashMap<TransmittableThreadLocal<Object>, ?>類型,而WeakHashMap的VALUE總是為NULL,這裡當做Set容器使用,WeakHashMap支援NULL值
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
// 注意這裡的WeakHashMap總是拷貝父執行緒的值
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
// 添加TTL自身實例到存儲器,不存在則添加策略
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
// 從存儲器移除TTL自身的實例
private void removeThisFromHolder() {
holder.get().remove(this);
}
// 執行目標方法,isBefore決定回調beforeExecute還是afterExecute,注意此回調方法會吞掉所有的異常只列印日誌
private static void doExecuteCallback(boolean isBefore) {
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
try {
if (isBefore) threadLocal.beforeExecute();
else threadLocal.afterExecute();
} catch (Throwable t) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
}
}
}
}
// DEBUG模式下列印TTL裡面的所有值
static void dump(@Nullable String title) {
if (title != null && title.length() > 0) {
System.out.printf("Start TransmittableThreadLocal[%s] Dump...%n", title);
} else {
System.out.println("Start TransmittableThreadLocal Dump...");
}
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
System.out.println(threadLocal.get());
}
System.out.println("TransmittableThreadLocal Dump end!");
}
// DEBUG模式下列印TTL裡面的所有值
static void dump() {
dump(null);
}
// 省略靜態類Transmitter的實現程式碼
}
這裡一定要記住holder
是全局靜態的,並且它自身也是一個InheritableThreadLocal
(get()
方法也是執行緒隔離的),它實際上就是父執行緒管理所有TransmittableThreadLocal
的橋樑。這裡可以考慮一個單執行緒的例子來說明TransmittableThreadLocal
的存儲架構:
public class TtlSample3 {
static TransmittableThreadLocal<String> TTL1 = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<String> TTL2 = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<String> TTL3 = new TransmittableThreadLocal<>();
public static void main(String[] args) throws Exception {
TTL1.set("VALUE-1");
TTL2.set("VALUE-2");
TTL3.set("VALUE-3");
}
}
這裡簡化了例子,只演示了單執行緒的場景,圖中的一些對象的哈希碼有可能每次啟動JVM
實例都不一樣,這裡只是做示例:
注釋裡面也提到,holder
裡面的WeakHashMap
是當成Set
容器使用,映射的值都是NULL
,每次遍歷它的所有KEY
就能獲取holder
裡面的所有的TransmittableThreadLocal
實例,它是一個全局的存儲器,但是本身是一個InheritableThreadLocal
,多執行緒共享後的映射關係會相對複雜:
再聊一下disableIgnoreNullValueSemantics
的作用,默認情況下disableIgnoreNullValueSemantics=false
,TTL
如果設置NULL
值,會直接從holder
移除對應的TTL
實例,在TTL#get()
方法被調用的時候,如果原來持有的屬性不為NULL
,該TTL
實例會重新加到holder
。如果設置disableIgnoreNullValueSemantics=true
,則set(null)
的語義和ThreadLocal
一致。見下面的例子:
public class TtlSample4 {
static TransmittableThreadLocal<Integer> TL1 = new TransmittableThreadLocal<Integer>(false) {
@Override
protected Integer initialValue() {
return 5;
}
@Override
protected Integer childValue(Integer parentValue) {
return 10;
}
};
static TransmittableThreadLocal<Integer> TL2 = new TransmittableThreadLocal<Integer>(true) {
@Override
protected Integer initialValue() {
return 5;
}
@Override
protected Integer childValue(Integer parentValue) {
return 10;
}
};
public static void main(String[] args) throws Exception {
TL1.set(null);
TL2.set(null);
Thread t1 = new Thread(TtlRunnable.get(() -> {
System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL1.get()));
}), "T1");
Thread t2 = new Thread(TtlRunnable.get(() -> {
System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL2.get()));
}), "T2");
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
Thread:T2,value:null
Thread:T1,value:5
這是因為框架的設計者不想把NULL
作為有狀態的值,如果真的有需要保持和ThreadLocal
一致的用法,可以在構造TransmittableThreadLocal
實例的時候傳入true
。
發射器Transmitter
發射器Transmitter
是TransmittableThreadLocal
的一個公有靜態類,它的核心功能是傳輸所有的TransmittableThreadLocal
實例和提供靜態方法註冊當前執行緒的變數到其他執行緒。按照筆者閱讀源碼的習慣,先看構造函數和關鍵屬性:
// # TransmittableThreadLocal#Transmitter
public static class Transmitter {
// 保存手動註冊的ThreadLocal->TtlCopier映射,這裡是因為部分API提供了TtlCopier給用戶實現
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
// threadLocalHolder更變時候的監視器
private static final Object threadLocalHolderUpdateLock = new Object();
// 標記WeakHashMap中的ThreadLocal的對應值為NULL的屬性,便於後面清理
private static final Object threadLocalClearMark = new Object();
// 默認的拷貝器,影子拷貝,直接返回父值
private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
@Override
public Object copy(Object parentValue) {
return parentValue;
}
};
// 私有構造,說明只能通過靜態方法提供外部調用
private Transmitter() {
throw new InstantiationError("Must not instantiate this class");
}
// 私有靜態類,快照,保存從holder中捕獲的所有TransmittableThreadLocal和外部手動註冊保存在threadLocalHolder的ThreadLocal的K-V映射快照
private static class Snapshot {
final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
}
Transmitter
在設計上是一個典型的工具類,外部只能調用其公有靜態方法。接著看其他靜態方法:
// # TransmittableThreadLocal#Transmitter
public static class Transmitter {
//######################################### 捕獲 ###########################################################
// 捕獲當前執行緒綁定的所有的TransmittableThreadLocal和已經註冊的ThreadLocal的值 - 使用了用時拷貝快照的策略
// 筆者註:它一般在構造任務實例的時候被調用,因此當前執行緒相對於子執行緒或者執行緒池的任務就是父執行緒,其實本質是捕獲父執行緒的所有執行緒本地變數的值
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 新建一個WeakHashMap,遍歷TransmittableThreadLocal#holder中的所有TransmittableThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 新建一個WeakHashMap,遍歷threadLocalHolder中的所有ThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
//######################################### 重放 ###########################################################
// 重放capture()方法中捕獲的TransmittableThreadLocal和手動註冊的ThreadLocal中的值,本質是重新拷貝holder中的所有變數,生成新的快照
// 筆者註:重放操作一般會在子執行緒或者執行緒池中的執行緒的任務執行的時候調用,因此此時的holder#get()拿到的是子執行緒的原來就存在的本地執行緒變數,重放操作就是把這些子執行緒原有的本地執行緒變數備份
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
// 重放所有的TTL的值
@NonNull
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
// 新建一個新的備份WeakHashMap,其實也是一個快照
WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
// 這裡的循環針對的是子執行緒,用於獲取的是子執行緒的所有執行緒本地變數
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 拷貝holder當前執行緒(子執行緒)綁定的所有TransmittableThreadLocal的K-V結構到備份中
backup.put(threadLocal, threadLocal.get());
// 清理所有的非捕獲快照中的TTL變數,以防有中間過程引入的額外的TTL變數(除了父執行緒的本地變數)影響了任務執行後的重放操作
// 簡單來說就是:移除所有子執行緒的不包含在父執行緒捕獲的執行緒本地變數集合的中所有子執行緒本地變數和對應的值
/**
* 這個問題可以舉個簡單的例子:
* static TransmittableThreadLocal<Integer> TTL = new TransmittableThreadLocal<>();
*
* 執行緒池中的子執行緒C中原來初始化的時候,在執行緒C中綁定了TTL的值為10087,C執行緒是核心執行緒不會主動銷毀。
*
* 父執行緒P在沒有設置TTL值的前提下,調用了執行緒C去執行任務,那麼在C執行緒的Runnable包裝類中通過TTL#get()就會獲取到10087,顯然是不符合預期的
*
* 所以,在C執行緒的Runnable包裝類之前之前,要從C執行緒的執行緒本地變數,移除掉不包含在父執行緒P中的所有執行緒本地變數,確保Runnable包裝類執行期間只能拿到父執行緒中捕獲到的執行緒本地變數
*
* 下面這個判斷和移除做的就是這個工作
*/
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 重新設置TTL的值到捕獲的快照中
// 其實真實的意圖是:把從父執行緒中捕獲的所有執行緒本地變數重寫設置到TTL中,本質上,子執行緒holder裡面的TTL綁定的值會被刷新
setTtlValuesTo(captured);
// 回調模板方法beforeExecute
doExecuteCallback(true);
return backup;
}
// 提取WeakHashMap中的KeySet,遍歷所有的TransmittableThreadLocal,重新設置VALUE
private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal<Object> threadLocal = entry.getKey();
// 重新設置TTL值,本質上,當前執行緒(子執行緒)holder裡面的TTL綁定的值會被刷新
threadLocal.set(entry.getValue());
}
}
// 重放所有的手動註冊的ThreadLocal的值
private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) {
// 新建備份
final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>();
// 注意這裡是遍歷捕獲的快照中的ThreadLocal
for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
// 添加到備份中
backup.put(threadLocal, threadLocal.get());
final Object value = entry.getValue();
// 如果值為清除標記則綁定在當前執行緒的變數進行remove,否則設置值覆蓋
if (value == threadLocalClearMark) threadLocal.remove();
else threadLocal.set(value);
}
return backup;
}
// 從relay()或者clear()方法中恢復TransmittableThreadLocal和手工註冊的ThreadLocal的值對應的備份
// 筆者註:恢復操作一般會在子執行緒或者執行緒池中的執行緒的任務執行的時候調用
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
// 回調模板方法afterExecute
doExecuteCallback(false);
// 這裡的循環針對的是子執行緒,用於獲取的是子執行緒的所有執行緒本地變數
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 如果子執行緒原來就綁定的執行緒本地變數的值,如果不包含某個父執行緒傳來的對象,那麼就刪除
// 這一步可以結合前面reply操作裡面的方法段一起思考,如果不刪除的話,就相當於子執行緒的原來存在的執行緒本地變數綁定值被父執行緒對應的值污染了
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 重新設置TTL的值到捕獲的快照中
// 其實真實的意圖是:把子執行緒的執行緒本地變數恢復到reply()的備份(前面的循環已經做了父執行緒捕獲變數的判斷),本質上,等於把holder中綁定於子執行緒本地變數的部分恢復到reply操作之前的狀態
setTtlValuesTo(backup);
}
// 恢復所有的手動註冊的ThreadLocal的值
private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) {
for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}
}
這裡三個核心方法,看起來比較抽象,要結合多執行緒的場景和一些空間想像進行推敲才能比較容易地理解:
capture()
:捕獲操作,父執行緒原來就存在的執行緒本地變數映射和手動註冊的執行緒本地變數映射捕獲,得到捕獲的快照值captured
。reply()
:重放操作,子執行緒原來就存在的執行緒本地變數映射和手動註冊的執行緒本地變數生成備份backup
,刷新captured
的所有值到子執行緒在全局存儲器holder
中綁定的值。restore()
:復原操作,子執行緒原來就存在的執行緒本地變數映射和手動註冊的執行緒本地變數恢復成backup
。
setTtlValuesTo()
這個方法比較隱蔽,要特別要結合多執行緒和空間思維去思考,例如當入參是captured
,本質是從父執行緒捕獲到的綁定在父執行緒的所有執行緒本地變數,調用的時機在reply()
和restore()
,這兩個方法只會在子執行緒中調用,setTtlValuesTo()
裡面拿到的TransmittableThreadLocal
實例調用set()
方法相當於把綁定在父執行緒的所有執行緒本地變數的值全部刷新到子執行緒當前綁定的TTL
中的執行緒本地變數的值,更深層次地想,是基於外部的傳入值刷新了子執行緒綁定在全局存儲器holder
裡面綁定到該子執行緒的執行緒本地變數的值。
Transmitter
還有不少靜態工具方法,這裡不做展開,可以參考項目裡面的測試demo
和README.md
進行調試。
捕獲、重放和復原
其實上面一節已經介紹了Transmitter
提供的捕獲、重放和復原的API
,這一節主要結合分析TtlRunnable
中的相關邏輯。TtlRunnable
的源碼如下:
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
// 存放從父執行緒捕獲得到的執行緒本地變數映射的備份
private final AtomicReference<Object> capturedRef;
// 原始的Runable實例
private final Runnable runnable;
// 執行之後是否釋放TTL值引用
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// 這裡關鍵點:TtlRunnable實例化的時候就已經進行了執行緒本地變數的捕獲,所以一定是針對父執行緒的,因為此時任務還沒提交到執行緒池
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
@Override
public void run() {
// 獲取父執行緒捕獲到的執行緒本地變數映射的備份,做一些前置判斷
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 重放操作
Object backup = replay(captured);
try {
// 真正的Runnable調用
runnable.run();
} finally {
// 復原操作
restore(backup);
}
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
// 省略其他不太重要的方法
}
其實關注點只需要放在構造函數、run()
方法,其他都是基於此做修飾或者擴展。構造函數的源碼說明,capture()
在TtlRunnable
實例化的時候已經被調用,實例化它的一般就是父執行緒,所以整體的執行流程如下:
Agent模組
啟用Agent
功能,需要在Java
的啟動參數添加:-javaagent:path/to/transmittable-thread-local-x.yzx.jar
。原理是通過Instrumentation
回調激發ClassFileTransformer
實現目標類的位元組碼增強,使用到javassist
,被增強的類主要是泛執行緒池的類:
Executor
體系:主要包括ThreadPoolExecutor
和ScheduledThreadPoolExecutor
,對應的位元組碼增強類實現是TtlExecutorTransformlet
。ForkJoinPool
:對應的位元組碼增強類實現是TtlForkJoinTransformlet
。TimerTask
:對應的位元組碼增強類實現是TtlTimerTaskTransformlet
。
Agent
的入口類是TtlAgent
,這裡查看對應的源碼:
public final class TtlAgent {
public static void premain(String agentArgs, @NonNull Instrumentation inst) {
kvs = splitCommaColonStringToKV(agentArgs);
Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
final Logger logger = Logger.getLogger(TtlAgent.class);
try {
logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
final boolean disableInheritableForThreadPool = isDisableInheritableForThreadPool();
// 裝載所有的JavassistTransformlet
final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
transformletList.add(new TtlExecutorTransformlet(disableInheritableForThreadPool));
transformletList.add(new TtlForkJoinTransformlet(disableInheritableForThreadPool));
if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
final ClassFileTransformer transformer = new TtlTransformer(transformletList);
inst.addTransformer(transformer, true);
logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
logger.info("[TtlAgent.premain] end");
ttlAgentLoaded = true;
} catch (Exception e) {
String msg = "Fail to load TtlAgent , cause: " + e.toString();
logger.log(Level.SEVERE, msg, e);
throw new IllegalStateException(msg, e);
}
}
}
List<JavassistTransformlet>
作為參數傳入ClassFileTransformer
的實現類TtlTransformer
中,其中的轉換方法為:
public class TtlTransformer implements ClassFileTransformer {
private final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
TtlTransformer(List<? extends JavassistTransformlet> transformletList) {
for (JavassistTransformlet transformlet : transformletList) {
this.transformletList.add(transformlet);
logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success");
}
}
@Override
public final byte[] transform(@Nullable final ClassLoader loader, @Nullable final String classFile, final Class<?> classBeingRedefined,
final ProtectionDomain protectionDomain, @NonNull final byte[] classFileBuffer) {
try {
// Lambda has no class file, no need to transform, just return.
if (classFile == null) return NO_TRANSFORM;
final String className = toClassName(classFile);
ClassInfo classInfo = new ClassInfo(className, classFileBuffer, loader);
// 這裡做變數,如果位元組碼被修改,則跳出循環返回
for (JavassistTransformlet transformlet : transformletList) {
transformlet.doTransform(classInfo);
if (classInfo.isModified()) return classInfo.getCtClass().toBytecode();
}
} catch (Throwable t) {
String msg = "Fail to transform class " + classFile + ", cause: " + t.toString();
logger.log(Level.SEVERE, msg, t);
throw new IllegalStateException(msg, t);
}
return NO_TRANSFORM;
}
}
這裡挑選TtlExecutorTransformlet
的部分方法來看:
@Override
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
// 如果當前載入的類包含java.util.concurrent.ThreadPoolExecutor或者java.util.concurrent.ScheduledThreadPoolExecutor
if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
final CtClass clazz = classInfo.getCtClass();
// 遍歷所有的方法進行增強
for (CtMethod method : clazz.getDeclaredMethods()) {
updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
}
// 省略其他程式碼
}
// 省略其他程式碼
}
private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
final int modifiers = method.getModifiers();
if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
// 這裡主要在java.lang.Runnable構造時候調用com.alibaba.ttl.TtlRunnable#get()包裝為com.alibaba.ttl.TtlRunnable
// 在java.util.concurrent.Callable構造時候調用com.alibaba.ttl.TtlCallable#get()包裝為com.alibaba.ttl.TtlCallable
// 並且設置附件K-V為ttl.is.auto.wrapper=true
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
final String paramTypeName = parameterTypes[i].getName();
if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
String code = String.format(
// decorate to TTL wrapper,
// and then set AutoWrapper attachment/Tag
"$%d = %s.get($%d, false, true);"
+ "\ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%<d);",
i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
}
if (insertCode.length() > 0) method.insertBefore(insertCode.toString());
}
上面分析的方法的功能,就是讓java.util.concurrent.ThreadPoolExecutor
和java.util.concurrent.ScheduledThreadPoolExecutor
的位元組碼被增強,提交的java.lang.Runnable
類型的任務會被包裝為TtlRunnable
,提交的java.util.concurrent.Callable
類型的任務會被包裝為TtlCallable
,實現了無入侵無感知地嵌入TTL
的功能。
小結
TTL
在使用執行緒池等會池化復用執行緒的執行組件情況下,提供ThreadLocal
值的傳遞功能,解決非同步執行時上下文傳遞的問題。 它是一個Java標準庫,為框架/中間件設施開發提供的標配能力,項目程式碼精悍,只依賴了javassist
做位元組碼增強,實現Agent
模式下的近乎無入侵提供TTL
功能的特性。TTL
能在業務程式碼中實現透明/自動完成所有非同步執行上下文的可訂製、規範化的捕捉/傳遞,如果恰好碰到非同步執行時上下文傳遞的問題,建議可以嘗試此庫。
參考資料:
JDK11
相關源碼- TTL源碼
個人部落格
(本文完 c-14-d e-a-20200502)