聊聊skywalking的cluster-nacos-plugin

  • 2020 年 3 月 31 日
  • 筆記

本文主要研究一下skywalking的cluster-nacos-plugin

ClusterModuleNacosConfig

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosConfig.java

public class ClusterModuleNacosConfig extends ModuleConfig {      @Setter @Getter private String serviceName;      @Setter @Getter private String hostPort;      @Setter @Getter private String namespace = "public";  }
  • ClusterModuleNacosConfig定义了serviceName、hostPort、namespace属性

ClusterModuleNacosProvider

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosProvider.java

public class ClusterModuleNacosProvider extends ModuleProvider {        private final ClusterModuleNacosConfig config;      private NamingService namingService;        public ClusterModuleNacosProvider() {          super();          this.config = new ClusterModuleNacosConfig();      }        @Override      public String name() {          return "nacos";      }        @Override      public Class<? extends ModuleDefine> module() {          return ClusterModule.class;      }        @Override      public ModuleConfig createConfigBeanIfAbsent() {          return config;      }        @Override      public void prepare() throws ServiceNotProvidedException, ModuleStartException {          try {              Properties properties = new Properties();              properties.put(PropertyKeyConst.SERVER_ADDR, config.getHostPort());              properties.put(PropertyKeyConst.NAMESPACE, config.getNamespace());              namingService = NamingFactory.createNamingService(properties);          } catch (Exception e) {              throw new ModuleStartException(e.getMessage(), e);          }          NacosCoordinator coordinator = new NacosCoordinator(namingService, config);          this.registerServiceImplementation(ClusterRegister.class, coordinator);          this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);      }        @Override      public void start() throws ServiceNotProvidedException {        }        @Override      public void notifyAfterCompleted() throws ServiceNotProvidedException {        }        @Override      public String[] requiredModules() {          return new String[]{CoreModule.NAME};      }  }
  • ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现

NacosCoordinator

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/NacosCoordinator.java

public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {        private final NamingService namingService;      private final ClusterModuleNacosConfig config;      private volatile Address selfAddress;        public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) {          this.namingService = namingService;          this.config = config;      }        @Override      public List<RemoteInstance> queryRemoteNodes() {          List<RemoteInstance> result = new ArrayList<>();          try {              List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);              if (CollectionUtils.isNotEmpty(instances)) {                  instances.forEach(instance -> {                      Address address = new Address(instance.getIp(), instance.getPort(), false);                      if (address.equals(selfAddress)) {                          address.setSelf(true);                      }                      result.add(new RemoteInstance(address));                  });              }          } catch (NacosException e) {              throw new ServiceQueryException(e.getErrMsg());          }          return result;      }        @Override      public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {          String host = remoteInstance.getAddress().getHost();          int port = remoteInstance.getAddress().getPort();          try {              namingService.registerInstance(config.getServiceName(), host, port);          } catch (Exception e) {              throw new ServiceRegisterException(e.getMessage());          }          this.selfAddress = remoteInstance.getAddress();          TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());      }  }
  • NacosCoordinator实现了ClusterRegister、ClusterNodesQuery接口,其queryRemoteNodes方法通过namingService.selectInstances(config.getServiceName(), true)获取instances;其registerRemote方法则通过namingService.registerInstance(config.getServiceName(), host, port)进行注册

小结

ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现

doc

  • ClusterModuleNacosProvider