聊聊skywalking的spring-webflux-plugin

  • 2020 年 3 月 13 日
  • 筆記

本文主要研究一下skywalking的spring-webflux-plugin

DispatcherHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java

public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {      @Override      public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[0];      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[]{              new InstanceMethodsInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getMethodsMatcher() {                      return named("handle");                  }                    @Override                  public String getMethodsInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";                  }                    @Override                  public boolean isOverrideArgs() {                      return false;                  }              }          };      }        @Override      protected ClassMatch enhanceClass() {          return byName("org.springframework.web.reactive.DispatcherHandler");      }  }
  • DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法

DispatcherHandlerHandleMethodInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java

public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {      private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";        @Override      public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                               MethodInterceptResult result) throws Throwable {        }        @Override      public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                                Object ret) throws Throwable {          EnhancedInstance instance = getInstance(allArguments[0]);            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];            ContextCarrier carrier = new ContextCarrier();          CarrierItem next = carrier.items();          HttpHeaders headers = exchange.getRequest().getHeaders();          while (next.hasNext()) {              next = next.next();              List<String> header = headers.get(next.getHeadKey());              if (header != null && header.size() > 0) {                  next.setHeadValue(header.get(0));              }          }            AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);          span.setComponent(ComponentsDefine.SPRING_WEBFLUX);          SpanLayer.asHttp(span);          Tags.URL.set(span, exchange.getRequest().getURI().toString());          HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());          instance.setSkyWalkingDynamicField(ContextManager.capture());          span.prepareForAsync();          ContextManager.stopSpan(span);            return ((Mono) ret).doFinally(s -> {              try {                  Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);                  if (pathPattern != null) {                      span.setOperationName(((PathPattern) pathPattern).getPatternString());                  }                  HttpStatus httpStatus = exchange.getResponse().getStatusCode();                  // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support                  if (httpStatus != null) {                      Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));                      if (httpStatus.isError()) {                          span.errorOccurred();                      }                  }              } finally {                  span.asyncFinish();              }          });        }        @Override      public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                                        Class<?>[] argumentsTypes, Throwable t) {      }        public static EnhancedInstance getInstance(Object o) {          EnhancedInstance instance = null;          if (o instanceof DefaultServerWebExchange) {              instance = (EnhancedInstance) o;          } else if (o instanceof ServerWebExchangeDecorator) {              ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();              return getInstance(delegate);          }          return instance;      }    }
  • DispatcherHandlerHandleMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法创建AbstractSpan,设置URL及METHOD的tag,执行span.prepareForAsync(),然后注册Mono的doFinally的Consumer,在里头设置span的operationName,statusCode,以及是否有异常,最后执行span.asyncFinish()

ServerWebExchangeInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java

public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {      @Override      public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {          return new ConstructorInterceptPoint[]{              new ConstructorInterceptPoint() {                  @Override                  public ElementMatcher<MethodDescription> getConstructorMatcher() {                      return any();                  }                    @Override                  public String getConstructorInterceptor() {                      return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";                  }              }          };      }        @Override      public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {          return new InstanceMethodsInterceptPoint[0];      }        @Override      protected ClassMatch enhanceClass() {          return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");      }  }
  • ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

ServerWebExchangeConstructorInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java

public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {      @Override      public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {      }  }
  • ServerWebExchangeConstructorInterceptor目前还是空实现

小结

DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法

doc

  • DispatcherHandlerInstrumentation
  • ServerWebExchangeInstrumentation