聊聊skywalking的spring-cloud-gateway-plugin

  • 2020 年 3 月 13 日
  • 笔记

本文主要研究一下skywalking的spring-cloud-gateway-plugin

NettyRoutingFilterInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java

public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {        @Override      public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[0];      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[]{              new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange"));                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              }          };      }        @Override      public ClassMatch enhanceClass() {          return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter");      }        @Override      protected final String[] witnessClasses() {          return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"};      }  }
  • NettyRoutingFilterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor拦截org.springframework.cloud.gateway.filter.NettyRoutingFilter带有org.springframework.web.server.ServerWebExchange参数的filter方法

NettyRoutingFilterInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java

public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor {          @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {          EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);          if (instance != null) {              SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();              ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter);          }      }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                                Class<?>[] argumentsTypes, Object ret) throws Throwable {          if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) {              ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);          }          return ret;      }          @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {      }        public static EnhancedInstance getInstance(Object o) {          EnhancedInstance instance = null;          if (o instanceof ServerWebExchangeDecorator) {              instance = getEnhancedInstance((ServerWebExchangeDecorator) o);          } else if (o instanceof DefaultServerWebExchange) {              instance = (EnhancedInstance) o;          }          return instance;      }          private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) {          Object o = serverWebExchangeDecorator.getDelegate();          if (o instanceof ServerWebExchangeDecorator) {              return getEnhancedInstance((ServerWebExchangeDecorator) o);          } else if (o instanceof DefaultServerWebExchange) {              return (EnhancedInstance) o;          } else if (o == null) {              throw new NullPointerException("The expected class DefaultServerWebExchange is null");          } else {              throw new RuntimeException("Unknown parameter types:" + o.getClass());          }      }  }
  • NettyRoutingFilterInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取swTransmitter,然后以名为Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法执行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)

HttpClientOperationsInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java

public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {        @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[0];      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[]{              new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders"));                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              },new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher"));                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              },              new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("status");                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              },          };      }        @Override      public ClassMatch enhanceClass() {          return byName("reactor.netty.http.client.HttpClientOperations");      }  }
  • HttpClientOperationsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine
  • 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有io.netty.handler.codec.http.HttpHeaders参数的headers方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有org.reactivestreams.Publisher参数的send方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor拦截reactor.netty.http.client.HttpClientOperations的status方法

HttpClientOperationsHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java

public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor {        private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class);        @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {      }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                                Class<?>[] argumentsTypes, Object ret) throws Throwable {          Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();          if (transmitter != null) {              objInst.setSkyWalkingDynamicField(transmitter);              ((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null);          }          return ret;      }          @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {      }  }
  • HttpClientOperationsHeadersInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法执行objInst.setSkyWalkingDynamicField(transmitter)

HttpClientOperationsSendInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java

public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor {        @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {          SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();          if (transmitter != null) {              HttpClientRequest request = (HttpClientRequest) objInst;                HttpHeaders header = request.requestHeaders();              ChannelOperations channelOpt = (ChannelOperations) objInst;              InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress());              String peer = remote.getHostName() + ":" + remote.getPort();                AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer);              ContextManager.continued(transmitter.getSnapshot());              ContextCarrier contextCarrier = new ContextCarrier();              ContextManager.inject(contextCarrier);                span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);              Tags.URL.set(span, peer + request.uri());              Tags.HTTP.METHOD.set(span, request.method().name());              SpanLayer.asHttp(span);                CarrierItem next = contextCarrier.items();              while (next.hasNext()) {                  next = next.next();                  header.set(next.getHeadKey(), next.getHeadValue());              }              transmitter.setSpanGateway(span.prepareForAsync());              ContextManager.stopSpan(span);          }      }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                                Class<?>[] argumentsTypes, Object ret) throws Throwable {          return ret;      }          @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {          ContextManager.activeSpan().errorOccurred().log(t);      }  }
  • HttpClientOperationsSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取request header、uri、method等信息设置到span中,最后执行ContextManager.stopSpan(span);其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

HttpClientOperationsStatusInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java

public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor {        @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {      }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                                Object ret) throws Throwable {            SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();          if (transmitter != null) {              HttpResponseStatus response = (HttpResponseStatus) ret;              if (response.code() >= 400) {                  Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code()));              }              transmitter.getSpanGateway().asyncFinish();              objInst.setSkyWalkingDynamicField(null);          }          return ret;      }        @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {          ContextManager.activeSpan().errorOccurred().log(t);      }  }
  • HttpClientOperationsStatusInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法获取transmitter,在response.code()大于等于400时设置statusCode的tag,然后执行transmitter.getSpanGateway().asyncFinish();其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)

FilteringWebHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java

public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {        @Override      public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[0];      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[]{              new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("handle");                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              }          };      }        @Override      public ClassMatch enhanceClass() {          return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler");      }    }
  • FilteringWebHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor拦截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法

FilteringWebHandlerInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java

public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor {        private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/";        @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {          EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);          if (instance == null) {              return;          }          ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField();          if (contextSnapshot == null) {              return;          }            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];          String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX;          Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);          operationName = operationName + route.getId();          SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName);          instance.setSkyWalkingDynamicField(transmitter);      }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,                                Class<?>[] argumentsTypes, Object ret) throws Throwable {          EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);          if (instance == null) {              return ret;          }          SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();          if (swTransmitter == null) {              return ret;          }          Mono<Void> mono = (Mono) ret;          return mono.doFinally(d -> {              ServerWebExchange exchange = (ServerWebExchange) allArguments[0];              HttpStatus statusCode = exchange.getResponse().getStatusCode();              if (statusCode == HttpStatus.TOO_MANY_REQUESTS) {                  AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName());                  Tags.STATUS_CODE.set(localSpan,statusCode.toString());                  SpanLayer.asHttp(localSpan);                  localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);                  ContextManager.continued(swTransmitter.getSnapshot());                  ContextManager.stopSpan(localSpan);              }          });      }          @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {      }    }
  • FilteringWebHandlerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法从exchange中获取route信息最后形成operationName创建SWTransmitter并执行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法获取SWTransmitter,然后注册mono的doFinally回调,在里头获取statusCode更细span,然后执行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)

DefaultHttpHeadersInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java

public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {        @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[] {              new ConstructorInterceptPoint() {                  @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {                      return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders");                  }                    @Override public String getConstructorInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor";                  }              }          };      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[0];      }        @Override      public ClassMatch enhanceClass() {          return byName("io.netty.handler.codec.http.DefaultHttpHeaders");      }  }
  • DefaultHttpHeadersInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor拦截io.netty.handler.codec.http.DefaultHttpHeaders的第一个参数为io.netty.handler.codec.DefaultHeaders的方法

DefaultHttpHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java

public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor {      @Override      public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {          Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);          if (transmitter != null) {              objInst.setSkyWalkingDynamicField(transmitter);              ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);          }      }  }
  • DefaultHttpHeadersInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法主要是将ContextManager.getRuntimeContext()中的transmitter设置到objInst中,然后从ContextManager.getRuntimeContext()移除该transmitter

小结

skywalking的spring-cloud-gateway-plugin主要有四个instrument,分别是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation

doc

  • NettyRoutingFilterInstrumentation
  • HttpClientOperationsInstrumentation
  • FilteringWebHandlerInstrumentation
  • DefaultHttpHeadersInstrumentation