聊聊skywalking的spring-cloud-gateway-plugin
- 2020 年 3 月 13 日
- 笔记
序
本文主要研究一下skywalking的spring-cloud-gateway-plugin
NettyRoutingFilterInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java
public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{ new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange")); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor"; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override public ClassMatch enhanceClass() { return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter"); } @Override protected final String[] witnessClasses() { return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"}; } }
- NettyRoutingFilterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor拦截org.springframework.cloud.gateway.filter.NettyRoutingFilter带有org.springframework.web.server.ServerWebExchange参数的filter方法
NettyRoutingFilterInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java
public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]); if (instance != null) { SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField(); ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) { ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER); } return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } public static EnhancedInstance getInstance(Object o) { EnhancedInstance instance = null; if (o instanceof ServerWebExchangeDecorator) { instance = getEnhancedInstance((ServerWebExchangeDecorator) o); } else if (o instanceof DefaultServerWebExchange) { instance = (EnhancedInstance) o; } return instance; } private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) { Object o = serverWebExchangeDecorator.getDelegate(); if (o instanceof ServerWebExchangeDecorator) { return getEnhancedInstance((ServerWebExchangeDecorator) o); } else if (o instanceof DefaultServerWebExchange) { return (EnhancedInstance) o; } else if (o == null) { throw new NullPointerException("The expected class DefaultServerWebExchange is null"); } else { throw new RuntimeException("Unknown parameter types:" + o.getClass()); } } }
- NettyRoutingFilterInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取swTransmitter,然后以名为Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法执行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)
HttpClientOperationsInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java
public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{ new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders")); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor"; } @Override public boolean isOverrideArgs() { return false; } },new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher")); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor"; } @Override public boolean isOverrideArgs() { return false; } }, new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("status"); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor"; } @Override public boolean isOverrideArgs() { return false; } }, }; } @Override public ClassMatch enhanceClass() { return byName("reactor.netty.http.client.HttpClientOperations"); } }
- HttpClientOperationsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine
- 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有io.netty.handler.codec.http.HttpHeaders参数的headers方法
- 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有org.reactivestreams.Publisher参数的send方法
- 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor拦截reactor.netty.http.client.HttpClientOperations的status方法
HttpClientOperationsHeadersInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java
public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor { private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class); @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField(); if (transmitter != null) { objInst.setSkyWalkingDynamicField(transmitter); ((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null); } return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
- HttpClientOperationsHeadersInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法执行objInst.setSkyWalkingDynamicField(transmitter)
HttpClientOperationsSendInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java
public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField(); if (transmitter != null) { HttpClientRequest request = (HttpClientRequest) objInst; HttpHeaders header = request.requestHeaders(); ChannelOperations channelOpt = (ChannelOperations) objInst; InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress()); String peer = remote.getHostName() + ":" + remote.getPort(); AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer); ContextManager.continued(transmitter.getSnapshot()); ContextCarrier contextCarrier = new ContextCarrier(); ContextManager.inject(contextCarrier); span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY); Tags.URL.set(span, peer + request.uri()); Tags.HTTP.METHOD.set(span, request.method().name()); SpanLayer.asHttp(span); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); header.set(next.getHeadKey(), next.getHeadValue()); } transmitter.setSpanGateway(span.prepareForAsync()); ContextManager.stopSpan(span); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { ContextManager.activeSpan().errorOccurred().log(t); } }
- HttpClientOperationsSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取request header、uri、method等信息设置到span中,最后执行ContextManager.stopSpan(span);其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)
HttpClientOperationsStatusInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java
public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField(); if (transmitter != null) { HttpResponseStatus response = (HttpResponseStatus) ret; if (response.code() >= 400) { Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code())); } transmitter.getSpanGateway().asyncFinish(); objInst.setSkyWalkingDynamicField(null); } return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { ContextManager.activeSpan().errorOccurred().log(t); } }
- HttpClientOperationsStatusInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法获取transmitter,在response.code()大于等于400时设置statusCode的tag,然后执行transmitter.getSpanGateway().asyncFinish();其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)
FilteringWebHandlerInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java
public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{ new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("handle"); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor"; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override public ClassMatch enhanceClass() { return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler"); } }
- FilteringWebHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor拦截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法
FilteringWebHandlerInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java
public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor { private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]); if (instance == null) { return; } ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField(); if (contextSnapshot == null) { return; } ServerWebExchange exchange = (ServerWebExchange) allArguments[0]; String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX; Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); operationName = operationName + route.getId(); SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName); instance.setSkyWalkingDynamicField(transmitter); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]); if (instance == null) { return ret; } SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField(); if (swTransmitter == null) { return ret; } Mono<Void> mono = (Mono) ret; return mono.doFinally(d -> { ServerWebExchange exchange = (ServerWebExchange) allArguments[0]; HttpStatus statusCode = exchange.getResponse().getStatusCode(); if (statusCode == HttpStatus.TOO_MANY_REQUESTS) { AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName()); Tags.STATUS_CODE.set(localSpan,statusCode.toString()); SpanLayer.asHttp(localSpan); localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY); ContextManager.continued(swTransmitter.getSnapshot()); ContextManager.stopSpan(localSpan); } }); } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
- FilteringWebHandlerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法从exchange中获取route信息最后形成operationName创建SWTransmitter并执行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法获取SWTransmitter,然后注册mono的doFinally回调,在里头获取statusCode更细span,然后执行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)
DefaultHttpHeadersInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java
public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[] { new ConstructorInterceptPoint() { @Override public ElementMatcher<MethodDescription> getConstructorMatcher() { return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders"); } @Override public String getConstructorInterceptor() { return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor"; } } }; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[0]; } @Override public ClassMatch enhanceClass() { return byName("io.netty.handler.codec.http.DefaultHttpHeaders"); } }
- DefaultHttpHeadersInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor拦截io.netty.handler.codec.http.DefaultHttpHeaders的第一个参数为io.netty.handler.codec.DefaultHeaders的方法
DefaultHttpHeadersInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java
public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER); if (transmitter != null) { objInst.setSkyWalkingDynamicField(transmitter); ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER); } } }
- DefaultHttpHeadersInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法主要是将ContextManager.getRuntimeContext()中的transmitter设置到objInst中,然后从ContextManager.getRuntimeContext()移除该transmitter
小结
skywalking的spring-cloud-gateway-plugin主要有四个instrument,分别是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation
doc
- NettyRoutingFilterInstrumentation
- HttpClientOperationsInstrumentation
- FilteringWebHandlerInstrumentation
- DefaultHttpHeadersInstrumentation