聊聊skywalking的metric-exporter

  • 2020 年 3 月 30 日
  • 筆記

本文主要研究一下skywalking的metric-exporter

metric-exporter.proto

skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto

syntax = "proto3";    option java_multiple_files = true;  option java_package = "org.apache.skywalking.oap.server.exporter.grpc";      service MetricExportService {      rpc export (stream ExportMetricValue) returns (ExportResponse) {      }        rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {      }  }    message ExportMetricValue {      string metricName = 1;      string entityName = 2;      string entityId = 3;      ValueType type = 4;      int64 timeBucket = 5;      int64 longValue = 6;      double doubleValue = 7;  }    message SubscriptionsResp {      repeated string metricNames = 1;  }    enum ValueType {      LONG = 0;      DOUBLE = 1;  }    message SubscriptionReq {    }    message ExportResponse {  }
  • metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法

GRPCExporterSetting

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java

@Setter  @Getter  public class GRPCExporterSetting extends ModuleConfig {      private String targetHost;      private int targetPort;      private int bufferChannelSize = 20000;      private int bufferChannelNum = 2;  }
  • GRPCExporterSetting定义了targetHost、targetPort、bufferChannelSize、bufferChannelNum属性

GRPCExporterProvider

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java

public class GRPCExporterProvider extends ModuleProvider {      private GRPCExporterSetting setting;      private GRPCExporter exporter;        @Override public String name() {          return "grpc";      }        @Override public Class<? extends ModuleDefine> module() {          return ExporterModule.class;      }        @Override public ModuleConfig createConfigBeanIfAbsent() {          setting = new GRPCExporterSetting();          return setting;      }        @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {          exporter = new GRPCExporter(setting);          this.registerServiceImplementation(MetricValuesExportService.class, exporter);      }        @Override public void start() throws ServiceNotProvidedException, ModuleStartException {        }        @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {          ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();          exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));          exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));          exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));            exporter.initSubscriptionList();      }        @Override public String[] requiredModules() {          return new String[] {CoreModule.NAME};      }  }
  • GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

MetricFormatter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java

@Setter  public class MetricFormatter {      private ServiceInventoryCache serviceInventoryCache;      private ServiceInstanceInventoryCache serviceInstanceInventoryCache;      private EndpointInventoryCache endpointInventoryCache;        protected String getEntityName(MetricsMetaInfo meta) {          int scope = meta.getScope();          if (DefaultScopeDefine.inServiceCatalog(scope)) {              int entityId = Integer.valueOf(meta.getId());              return serviceInventoryCache.get(entityId).getName();          } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {              int entityId = Integer.valueOf(meta.getId());              return serviceInstanceInventoryCache.get(entityId).getName();          } else if (DefaultScopeDefine.inEndpointCatalog(scope)) {              int entityId = Integer.valueOf(meta.getId());              return endpointInventoryCache.get(entityId).getName();          } else if (scope == DefaultScopeDefine.ALL) {              return "";          } else {              return null;          }      }  }
  • MetricFormatter提供了getEntityName方法,用于从MetricsMetaInfo提取entityName

MetricValuesExportService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java

public interface MetricValuesExportService extends Service {      /**       * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.       *       * @param event value is only accurate when the method invokes. Don't cache it.       */      void export(ExportEvent event);  }
  • MetricValuesExportService继承了Service,它定义了export方法

GRPCExporter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java

public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> {      private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);        private GRPCExporterSetting setting;      private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;      private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;      private final DataCarrier exportBuffer;      private final Set<String> subscriptionSet;        public GRPCExporter(GRPCExporterSetting setting) {          this.setting = setting;          GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());          client.connect();          ManagedChannel channel = client.getChannel();          exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);          blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);          exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());          exportBuffer.consume(this, 1, 200);          subscriptionSet = new HashSet<>();      }        @Override public void export(ExportEvent event) {          if (ExportEvent.EventType.TOTAL == event.getType()) {              Metrics metrics = event.getMetrics();              if (metrics instanceof WithMetadata) {                  MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();                  if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {                      exportBuffer.produce(new ExportData(meta, metrics));                  }              }          }      }        public void initSubscriptionList() {          SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());          subscription.getMetricNamesList().forEach(subscriptionSet::add);          logger.debug("Get exporter subscription list, {}", subscriptionSet);      }        @Override public void init() {        }        @Override public void consume(List<ExportData> data) {          if (data.size() == 0) {              return;          }            ExportStatus status = new ExportStatus();          StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(              new StreamObserver<ExportResponse>() {                  @Override public void onNext(ExportResponse response) {                    }                    @Override public void onError(Throwable throwable) {                      status.done();                  }                    @Override public void onCompleted() {                      status.done();                  }              }          );          AtomicInteger exportNum = new AtomicInteger();          data.forEach(row -> {              ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();                Metrics metrics = row.getMetrics();              if (metrics instanceof LongValueHolder) {                  long value = ((LongValueHolder)metrics).getValue();                  builder.setLongValue(value);                  builder.setType(ValueType.LONG);              } else if (metrics instanceof IntValueHolder) {                  long value = ((IntValueHolder)metrics).getValue();                  builder.setLongValue(value);                  builder.setType(ValueType.LONG);              } else if (metrics instanceof DoubleValueHolder) {                  double value = ((DoubleValueHolder)metrics).getValue();                  builder.setDoubleValue(value);                  builder.setType(ValueType.DOUBLE);              } else {                  return;              }                MetricsMetaInfo meta = row.getMeta();              builder.setMetricName(meta.getMetricsName());              String entityName = getEntityName(meta);              if (entityName == null) {                  return;              }              builder.setEntityName(entityName);              builder.setEntityId(meta.getId());                builder.setTimeBucket(metrics.getTimeBucket());                streamObserver.onNext(builder.build());              exportNum.getAndIncrement();          });            streamObserver.onCompleted();            long sleepTime = 0;          long cycle = 100L;          /**           * For memory safe of oap, we must wait for the peer confirmation.           */          while (!status.isDone()) {              try {                  sleepTime += cycle;                  Thread.sleep(cycle);              } catch (InterruptedException e) {              }                if (sleepTime > 2000L) {                  logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.",                      exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);                  cycle = 2000L;              }          }            logger.debug("Exported {} metrics to {}:{} in {} milliseconds.",              exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);      }        @Override public void onError(List<ExportData> data, Throwable t) {          logger.error(t.getMessage(), t);      }        @Override public void onExit() {        }        @Getter(AccessLevel.PRIVATE)      public class ExportData {          private MetricsMetaInfo meta;          private Metrics metrics;            public ExportData(MetricsMetaInfo meta, Metrics metrics) {              this.meta = meta;              this.metrics = metrics;          }      }        private class ExportStatus {          private boolean done = false;            private void done() {              done = true;          }            public boolean isDone() {              return done;          }      }  }
  • GRPCExporter继承了MetricFormatter,实现了MetricValuesExportService、IConsumer接口;其构造器根据GRPCExporterSetting实例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,并创建DataCarrier,然后注册自身的IConsumer到exportBuffer;其export方法主要是执行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是构造ExportMetricValue,然后执行streamObserver.onNext

小结

metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法;GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

doc

  • metric-exporter