Spring Cloud Gateway 沒有鏈路資訊,我 TM 人傻了(下)

本系列是 我TM人傻了 系列第五期[捂臉],往期精彩回顧:

image

本篇文章涉及底層設計以及原理,以及問題定位和可能的問題點,非常深入,篇幅較長,所以拆分成上中下三篇:

  • :問題簡單描述以及 Spring Cloud Gateway 基本結構和流程以及底層原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的鏈路追蹤以及為何會出現這個問題
  • :現有 Spring Cloud Sleuth 的非侵入設計帶來的性能問題,其他可能的問題點,以及如何解決

Spring Cloud Gateway 其他的可能丟失鏈路資訊的點

經過前面的分析,我們可以看出,不止這裡,還有其他地方會導致 Spring Cloud Sleuth 的鏈路追蹤資訊消失,這裡舉幾個大家常見的例子:

1.在 GatewayFilter 中指定了非同步執行某些任務,由於執行緒切換了,並且這時候可能 Span 已經結束了,所以沒有鏈路資訊,例如

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {
			//這裡就沒有鏈路資訊了
            log.info("success");
	});
}

2.將 GatewayFilter 中繼續鏈路的 chain.filter(exchange) 放到了非同步任務中執行,上面的 AdaptCachedBodyGlobalFilter 就屬於這種情況,這樣會導致之後的 GatewayFilter 都沒有鏈路資訊,例如:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}

Java 並發編程模型與 Project Reactor 編程模型的衝突思考

Java 中的很多框架,都用到了 ThreadLocal,或者通過 Thread 來標識唯一性。例如:

  • 日誌框架中的 MDC,一般都是 ThreadLocal 實現。
  • 所有的鎖、基於 AQS 的數據結構,都是通過 Thread 的屬性來唯一標識誰獲取到了鎖的。
  • 分散式鎖等數據結構,也是通過 Thread 的屬性來唯一標識誰獲取到了鎖的,例如 Redisson 中分散式 Redis 鎖的實現。

但是放到 Project Reactor 編程模型,這就顯得格格不入了,因為 Project Reactor 非同步響應式編程就是不固定執行緒,沒法保證提交任務和回調能在同一個執行緒,所以 ThreadLocal 的語義在這裡很難成立。Project Reactor 雖然提供了對標 ThreadLocal 的 Context,但是主流框架還沒有兼容這個 Context,所以給 Spring Cloud Sleuth 粘合這些鏈路追蹤帶來了很大困難,因為 MDC 是一個 ThreadLocal 的 Map 實現,而不是基於 Context 的 Map。這就需要 Spring Cloud Sleuth 在訂閱一開始,就需要將鏈路資訊放入 MDC,同時還需要保證運行時不切換執行緒。

運行不切換執行緒,這樣其實限制了 Project Reactor 的靈活調度,是有一些性能損失的。我們其實想盡量就算加入了鏈路追蹤資訊,也不用強制運行不切換執行緒。但是 Spring Cloud Sleuth 是非侵入式設計,很難實現這一點。但是對於我們自己業務的使用,我們可以訂製一些編程規範,來保證大家寫的程式碼不丟失鏈路資訊

改進我們的編程規範

首先,我們自定義 Mono 和 Flux 的工廠

公共 Subscriber 封裝,將 reactor Subscriber 的所有關鍵介面,都檢查當前上下文是否有鏈路資訊,即 Span,如果沒有就包裹上,如果有則直接執行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{
    private final Subscriber<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void onSubscribe(Subscription s) {
        executeWithinScope(() -> {
            delegate.onSubscribe(s);
        });
    }

    @Override
    public void onError(Throwable t) {
        executeWithinScope(() -> {
            delegate.onError(t);
        });
    }

    @Override
    public void onComplete() {
        executeWithinScope(() -> {
            delegate.onComplete();
        });
    }

    @Override
    public void onNext(T o) {
        executeWithinScope(() -> {
            delegate.onNext(o);
        });
    }

    private void executeWithinScope(Runnable runnable) {
        //如果當前沒有鏈路資訊,強制包裹
        if (tracer.currentSpan() == null) {
            try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
                runnable.run();
            }
        } else {
            //如果當前已有鏈路資訊,則直接執行
            runnable.run();
        }
    }
}

之後分別定義所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其實就是在 subscribe 的時候,用 TracedCoreSubscriber 包裝傳入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {
    private final Flux<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

public class TracedMono<T> extends Mono<T> {
    private final Mono<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

定義工廠類,使用請求 ServerWebExchange 和原始 Flux 創建 TracedFlux,以及使用請求 ServerWebExchange 和原始 Mono 創建 TracedMono,並且 Span 是通過 Attributes 獲取的,根據前文的源碼分析我們知道,這個 Attribute 是通過 TraceWebFilter 放入 Attributes 的。由於我們只在 GatewayFilter 中使用,一定在 TraceWebFilter 之後 所以這個 Attribute 一定存在。

@Component
public class TracedPublisherFactory {
    protected static final String TRACE_REQUEST_ATTR = Span.class.getName();

    @Autowired
    private Tracer tracer;
    @Autowired
    private CurrentTraceContext currentTraceContext;

    public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {
        return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }

    public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {
        return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }
}

然後,我們規定:1. 所有的 GatewayFilter,需要繼承我們自定義的抽象類,這個抽象類僅僅是把 filter 的結果用 TracedPublisherFactory 的 getTracedMono 給封裝了一層 TracedMono,以 GlobalFilter 為例子:

public abstract class AbstractTracedFilter implements GlobalFilter {
    @Autowired
    protected TracedPublisherFactory tracedPublisherFactory;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
    }

    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}

2. GatewayFilter 中新生成的 Flux 或者 Mono,統一使用 TracedPublisherFactory 再封裝一層

3. 對於 AdaptCachedBodyGlobalFilter 讀取 Request Body 導致的鏈路丟失,我向社區提了一個 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以參考。也可以在這個 Filter 之前自己將 Request Body 使用 TracedPublisherFactory 進行封裝解決。

微信搜索「我的編程喵」關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer