Spring Cloud Gateway 沒有鏈路資訊,我 TM 人傻了(中)

本系列是 我TM人傻了 系列第五期[捂臉],往期精彩回顧:

本篇文章涉及底層設計以及原理,以及問題定位和可能的問題點,非常深入,篇幅較長,所以拆分成上中下三篇:

  • :問題簡單描述以及 Spring Cloud Gateway 基本結構和流程以及底層原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的鏈路追蹤以及為何會出現這個問題
  • :現有 Spring Cloud Sleuth 的非侵入設計帶來的性能問題,其他可能的問題點,以及如何解決

Spring Cloud Sleuth 是如何增加鏈路資訊

通過之前的源碼分析,我們知道,在最開始的 TraceWebFilter,我們將 Mono 封裝成了一個 MonoWebFilterTrace,它的核心源碼是:

@Override
public void subscribe(CoreSubscriber<? super Void> subscriber) {
	Context context = contextWithoutInitialSpan(subscriber.currentContext());
	Span span = findOrCreateSpan(context);
	//將 Span 放入執行上下文中,對於日誌其實就是將鏈路資訊放入 org.slf4j.MDC
	//日誌的 MDC 一般都是 ThreadLocal 的 Map,對於 Log4j2 的實現類就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一個基於 ThreadLocal 實現的 Map
	//簡單理解就是將鏈路資訊放入一個 ThreadLocal 的 Map 中,每個執行緒訪問自己的 Map 獲取鏈路資訊
	try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {
		//將實際的 subscribe 用 Span 所在的 Context 包裹住,結束時關閉 Span
		this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));
	}
	//在 scope.close() 之後,會將鏈路資訊從  ThreadLocal 的 Map 中剔除
}

@Override
public Object scanUnsafe(Attr key) {
	if (key == Attr.RUN_STYLE) {
		//執行的方式必須是不能切換執行緒,也就是同步的
		//因為,日誌的鏈路資訊是放在 ThreadLocal 對象中,切換執行緒,鏈路資訊就沒了
		return Attr.RunStyle.SYNC; 
	}
	return super.scanUnsafe(key);
}

WebFilterTraceSubscriber 幹了些什麼呢?出現異常,以及 http 請求結束的時候,我們可能想將響應資訊,異常資訊記錄進入 Span 中,就是通過這個類封裝實現的。

經過 MonoWebFilterTrace 的封裝,由於 Spring-WebFlux 處理請求,其實就是封裝成我們上面得出的 Mono 之後進行 subscribe 處理的請求,所以這樣,整個內部 Mono 的 publish 鏈路以及 subscribe 鏈路,就被 WebFilterTraceSubscriber 中的 scope 包裹起來了。只要我們自己不在 GatewayFilter 中轉換成某些強制非同步的 Mono 或者 Flux 導致切換執行緒,鏈路資訊是不會丟失的。

我們應用中丟失鏈路資訊的地方

通過查看日誌我們發現,啟用 RequestBody 快取的地方,都有鏈路缺失。這個 RequestBody 快取我們使用的是 Spring Cloud Gateway 中的 AdaptCachedBodyGlobalFilter,其核心源碼是:

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest,
		Function<ServerHttpRequest, Mono<T>> function) {
	ServerHttpResponse response = exchange.getResponse();
	NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
	return 
		//讀取 Body,由於 TCP 拆包,所以需要他們拼接到一起
		DataBufferUtils.join(exchange.getRequest().getBody())
			//如果沒有 Body,則直接返回空 DataBuffer
			.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
			//decorate方法中將 dataBuffer 放入 exchange 的 Attributes 列表,只是為了防止重複進入這個 `AdaptCachedBodyGlobalFilter` 的情況導致重複快取請求 Body
			//之後,使用新的 body 以及原始請求封裝成新的請求,繼續 GatewayFilters 鏈路
			.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
			.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
}

為何會使用這個 AdaptCachedBodyGlobalFilter 呢?獲取請求 Body 是通過 exchange.getRequest().getBody() 獲取的,其結果是一個 Flux<DataBuffer>.請求的 Body 是一次性的,如果你需要請求重試的話,在第一次調用失敗的之後,第二次重試的時候,Body 就讀取不到了,因為 Flux 已經結束。所以,對於需要重複調用,例如重試,一對多路由轉發的情況,需要將請求 Body 快取起來,就是經過這個 GatewayFilter。但是經過這個 GatewayFilter 之後,鏈路資訊就沒了,可以通過以下這個簡單項目進行復現(項目地址):

引入依賴:

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.4.6</version>
</parent>

<dependencies>
	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-gateway</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-starter-sleuth</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-log4j2</artifactId>
	</dependency>
	<!--log4j2非同步日誌需要的依賴,所有項目都必須用log4j2和非同步日誌配置-->
	<dependency>
		<groupId>com.lmax</groupId>
		<artifactId>disruptor</artifactId>
		<version>${disruptor.version}</version>
	</dependency>
</dependencies>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>2020.0.3</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

對所有路徑開啟 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false)
public class ApiGatewayConfiguration {
    @Autowired
    private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
    @Autowired
    private GatewayProperties gatewayProperties;

    @PostConstruct
    public void init() {
        gatewayProperties.getRoutes().forEach(routeDefinition -> {
			//對 spring cloud gateway 路由配置中的每個路由都啟用 AdaptCachedBodyGlobalFilter 
            EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
            adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
        });
    }
}

配置(我們只有一個路由,將請求轉發到 httpbin.org 這個 http 請求測試網站):

server:
  port: 8181
spring:
  application:
    name: apiGateway
  cloud:
    gateway:
      httpclient:
        connect-timeout: 500
        response-timeout: 60000
      routes:
        - id: first_route
          uri: //httpbin.org
          predicates:
              - Path=/httpbin/**
          filters:
              - StripPrefix=1

添加兩個全局 Filter,一個在 AdaptCachedBodyGlobalFilter 之前,一個在 AdaptCachedBodyGlobalFilter 之後。這兩個 Filter 非常簡單,只是打一行日誌。

@Log4j2
@Component
public class PreLogFilter implements GlobalFilter, Ordered {

    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("before AdaptCachedBodyGlobalFilter");
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return ORDER;
    }
}

@Log4j2
@Component
public class PostLogFilter implements GlobalFilter, Ordered {

    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("after AdaptCachedBodyGlobalFilter");
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return ORDER;
    }
}

最後指定 Log4j2 的輸出格式中包含鏈路資訊,就像系列文章開頭中指定的那樣。

啟動這個應用,之後訪問 //127.0.0.1:8181/httpbin/anything,查看日誌,發現 PostLogFilter 中的日誌,沒有鏈路資訊了:

2021-09-08 06:32:35.457  INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter
2021-09-08 06:32:35.474  INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter

為何鏈路資訊會丟失

我們來看經過 AdaptCachedBodyGlobalFilter 之後,我們前面拼的 Mono 鏈路會變成什麼樣:

return Mono.defer(() ->
	new MonoWebFilterTrace(source, 
		RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根據請求尋找路由
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //將路由放入 Attributes 中,後面我們還會用到
					return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
				}).switchIfEmpty( //如果為 Mono.empty(),也就是沒找到路由
					Mono.empty() 
					.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之後,記錄日誌
						if (logger.isTraceEnabled()) {
							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
					}
				})))
			.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果沒有返回不為 Mono.empty() 的 handlerMapping,則直接返回 404
			.then(
				Mono.defer(() -> {
					//省略在 AdaptCachedBodyGlobalFilter 前面的鏈路嵌套
					//讀取 Body,由於 TCP 拆包,所以需要他們拼接到一起
					DataBufferUtils.join(exchange.getRequest().getBody())
						//如果沒有 Body,則直接返回空 DataBuffer
						.defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
						//decorate方法中將 dataBuffer 放入 exchange 的 Attributes 列表,只是為了防止重複進入這個 `AdaptCachedBodyGlobalFilter` 的情況導致重複快取請求 Body
						//之後,使用新的 body 以及原始請求封裝成新的請求,繼續 GatewayFilters 鏈路
						.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
						.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
				})
				.then(Mono.empty()))
			), //調用對應的 Handler
	TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
		//MetricsWebFilter 相關的處理,在前面的程式碼中給出了,這裡省略
	});
);

其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其實是一個 FluxReceive,這裡我們可以理解為:提交一個嘗試讀取請求 Body 的任務,將之後的 GatewayFilter 的鏈路處理加到在讀取完 Body 之後的回調當中,提交這個任務後,立刻返回。這麼看可能比較複雜,我們用一個類似的例子類比下:

//首先我們創建一個新的 Span
Span span = tracer.newTrace();
//聲明一個類似於 TraceWebFilter 中封裝的 MonoWebFilterTrace 的 MonoOperator
class MonoWebFilterTrace<T> extends MonoOperator<T, T> {
	protected MonoWebFilterTrace(Mono<? extends T> source) {
		super(source);
	}

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		//將 subscribe 用 span 包裹
		try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {
			source.subscribe(actual);
			//在將要關閉 spanInScope 的時候(即從 ThreadLocal 的 Map 中移除鏈路資訊),列印日誌
			log.info("stopped");
		}
	}
}

Mono.defer(() -> new MonoWebFilterTrace(
		Mono.fromRunnable(() -> {
			log.info("first");
		})
		//模擬 FluxReceive
		.then(Mono.delay(Duration.ofSeconds(1))
		.doOnSuccess(longSignal -> log.info(longSignal))))
).subscribe(aLong -> log.info(aLong));

Mono.delay 和 FluxReceive 表現類似,都是非同步切換執行緒池執行。執行上面的程式碼,我們可以從日誌上面就能看出來:

2021-09-08 07:12:45.236  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first
2021-09-08 07:12:45.240  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped
2021-09-08 07:12:46.241  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)
2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()
2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0

在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的執行緒池和調用 GatewayFilter 的是同一個執行緒池,所以可能執行緒還是同一個,但是由於 Span 已經結束,從 ThreadLocal 的 Map 中已經移除了鏈路資訊,所以日誌中還是沒有鏈路資訊。

微信搜索「我的編程喵」關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer

Exit mobile version