Spring Cloud Gateway 沒有鏈路資訊,我 TM 人傻了(下)
- 2021 年 9 月 26 日
- 筆記
- Spring Cloud, Spring Cloud 全解
本系列是 我TM人傻了 系列第五期[捂臉],往期精彩回顧:
本篇文章涉及底層設計以及原理,以及問題定位和可能的問題點,非常深入,篇幅較長,所以拆分成上中下三篇:
- 上:問題簡單描述以及 Spring Cloud Gateway 基本結構和流程以及底層原理
- 中:Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的鏈路追蹤以及為何會出現這個問題
- 下:現有 Spring Cloud Sleuth 的非侵入設計帶來的性能問題,其他可能的問題點,以及如何解決
Spring Cloud Gateway 其他的可能丟失鏈路資訊的點
經過前面的分析,我們可以看出,不止這裡,還有其他地方會導致 Spring Cloud Sleuth 的鏈路追蹤資訊消失,這裡舉幾個大家常見的例子:
1.在 GatewayFilter 中指定了非同步執行某些任務,由於執行緒切換了,並且這時候可能 Span 已經結束了,所以沒有鏈路資訊,例如:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {
//這裡就沒有鏈路資訊了
log.info("success");
});
}
2.將 GatewayFilter 中繼續鏈路的 chain.filter(exchange)
放到了非同步任務中執行,上面的 AdaptCachedBodyGlobalFilter 就屬於這種情況,這樣會導致之後的 GatewayFilter 都沒有鏈路資訊,例如:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}
Java 並發編程模型與 Project Reactor 編程模型的衝突思考
Java 中的很多框架,都用到了 ThreadLocal,或者通過 Thread 來標識唯一性。例如:
- 日誌框架中的 MDC,一般都是 ThreadLocal 實現。
- 所有的鎖、基於 AQS 的數據結構,都是通過 Thread 的屬性來唯一標識誰獲取到了鎖的。
- 分散式鎖等數據結構,也是通過 Thread 的屬性來唯一標識誰獲取到了鎖的,例如 Redisson 中分散式 Redis 鎖的實現。
但是放到 Project Reactor 編程模型,這就顯得格格不入了,因為 Project Reactor 非同步響應式編程就是不固定執行緒,沒法保證提交任務和回調能在同一個執行緒,所以 ThreadLocal 的語義在這裡很難成立。Project Reactor 雖然提供了對標 ThreadLocal 的 Context,但是主流框架還沒有兼容這個 Context,所以給 Spring Cloud Sleuth 粘合這些鏈路追蹤帶來了很大困難,因為 MDC 是一個 ThreadLocal 的 Map 實現,而不是基於 Context 的 Map。這就需要 Spring Cloud Sleuth 在訂閱一開始,就需要將鏈路資訊放入 MDC,同時還需要保證運行時不切換執行緒。
運行不切換執行緒,這樣其實限制了 Project Reactor 的靈活調度,是有一些性能損失的。我們其實想盡量就算加入了鏈路追蹤資訊,也不用強制運行不切換執行緒。但是 Spring Cloud Sleuth 是非侵入式設計,很難實現這一點。但是對於我們自己業務的使用,我們可以訂製一些編程規範,來保證大家寫的程式碼不丟失鏈路資訊。
改進我們的編程規範
首先,我們自定義 Mono 和 Flux 的工廠
公共 Subscriber 封裝,將 reactor Subscriber 的所有關鍵介面,都檢查當前上下文是否有鏈路資訊,即 Span,如果沒有就包裹上,如果有則直接執行即可。
public class TracedCoreSubscriber<T> implements Subscriber<T>{
private final Subscriber<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void onSubscribe(Subscription s) {
executeWithinScope(() -> {
delegate.onSubscribe(s);
});
}
@Override
public void onError(Throwable t) {
executeWithinScope(() -> {
delegate.onError(t);
});
}
@Override
public void onComplete() {
executeWithinScope(() -> {
delegate.onComplete();
});
}
@Override
public void onNext(T o) {
executeWithinScope(() -> {
delegate.onNext(o);
});
}
private void executeWithinScope(Runnable runnable) {
//如果當前沒有鏈路資訊,強制包裹
if (tracer.currentSpan() == null) {
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
runnable.run();
}
} else {
//如果當前已有鏈路資訊,則直接執行
runnable.run();
}
}
}
之後分別定義所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其實就是在 subscribe 的時候,用 TracedCoreSubscriber 包裝傳入的 CoreSubscriber:
public class TracedFlux<T> extends Flux<T> {
private final Flux<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
public class TracedMono<T> extends Mono<T> {
private final Mono<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span;
TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}
定義工廠類,使用請求 ServerWebExchange 和原始 Flux 創建 TracedFlux,以及使用請求 ServerWebExchange 和原始 Mono 創建 TracedMono,並且 Span 是通過 Attributes 獲取的,根據前文的源碼分析我們知道,這個 Attribute 是通過 TraceWebFilter 放入 Attributes 的。由於我們只在 GatewayFilter 中使用,一定在 TraceWebFilter 之後 所以這個 Attribute 一定存在。
@Component
public class TracedPublisherFactory {
protected static final String TRACE_REQUEST_ATTR = Span.class.getName();
@Autowired
private Tracer tracer;
@Autowired
private CurrentTraceContext currentTraceContext;
public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {
return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {
return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
}
然後,我們規定:1. 所有的 GatewayFilter,需要繼承我們自定義的抽象類,這個抽象類僅僅是把 filter 的結果用 TracedPublisherFactory 的 getTracedMono 給封裝了一層 TracedMono,以 GlobalFilter 為例子:
public abstract class AbstractTracedFilter implements GlobalFilter {
@Autowired
protected TracedPublisherFactory tracedPublisherFactory;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
}
protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}
2. GatewayFilter 中新生成的 Flux 或者 Mono,統一使用 TracedPublisherFactory 再封裝一層。
3. 對於 AdaptCachedBodyGlobalFilter 讀取 Request Body 導致的鏈路丟失,我向社區提了一個 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以參考。也可以在這個 Filter 之前自己將 Request Body 使用 TracedPublisherFactory 進行封裝解決。
微信搜索「我的編程喵」關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer: