聊聊nacos Service的processClientBeat

  • 2019 年 10 月 4 日
  • 筆記

本文主要研究一下nacos Service的processClientBeat

Service.processClientBeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {  ​      private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\.:_-]+";  ​      @JSONField(serialize = false)      private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);  ​      private String token;      private List<String> owners = new ArrayList<>();      private Boolean resetWeight = false;      private Boolean enabled = true;      private Selector selector = new NoneSelector();      private String namespaceId;  ​      /**       * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.       */      private long ipDeleteTimeout = 30 * 1000;  ​      private volatile long lastModifiedMillis = 0L;  ​      private volatile String checksum;  ​      /**       * TODO set customized push expire time:       */      private long pushCacheMillis = 0L;  ​      private Map<String, Cluster> clusterMap = new HashMap<>();  ​      //......  ​      public void processClientBeat(final RsInfo rsInfo) {          ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();          clientBeatProcessor.setService(this);          clientBeatProcessor.setRsInfo(rsInfo);          HealthCheckReactor.scheduleNow(clientBeatProcessor);      }  ​      //......  }
  • Service的processClientBeat方法会创建ClientBeatProcessor,并使用HealthCheckReactor进行调度

ClientBeatProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java

public class ClientBeatProcessor implements Runnable {      public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);      private RsInfo rsInfo;      private Service service;  ​      @JSONField(serialize = false)      public PushService getPushService() {          return SpringContext.getAppContext().getBean(PushService.class);      }  ​      public RsInfo getRsInfo() {          return rsInfo;      }  ​      public void setRsInfo(RsInfo rsInfo) {          this.rsInfo = rsInfo;      }  ​      public Service getService() {          return service;      }  ​      public void setService(Service service) {          this.service = service;      }  ​      @Override      public void run() {          Service service = this.service;          if (Loggers.EVT_LOG.isDebugEnabled()) {              Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());          }  ​          String ip = rsInfo.getIp();          String clusterName = rsInfo.getCluster();          int port = rsInfo.getPort();          Cluster cluster = service.getClusterMap().get(clusterName);          List<Instance> instances = cluster.allIPs(true);  ​          for (Instance instance : instances) {              if (instance.getIp().equals(ip) && instance.getPort() == port) {                  if (Loggers.EVT_LOG.isDebugEnabled()) {                      Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());                  }                  instance.setLastBeat(System.currentTimeMillis());                  if (!instance.isMarked()) {                      if (!instance.isHealthy()) {                          instance.setHealthy(true);                          Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",                              cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);                          getPushService().serviceChanged(service);                      }                  }              }          }      }  }
  • ClientBeatProcessor实现了Runnable方法,它会遍历instances更新指定ip及port的instance的lastBeat时间;同时对于非marked且healthy为false的instance更新其healthy为true并通过getPushService().serviceChanged发布变更事件

HealthCheckReactor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java

public class HealthCheckReactor {  ​      private static final ScheduledExecutorService EXECUTOR;  ​      private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();  ​      static {  ​          int processorCount = Runtime.getRuntime().availableProcessors();          EXECUTOR                  = Executors                  .newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() {                      @Override                      public Thread newThread(Runnable r) {                          Thread thread = new Thread(r);                          thread.setDaemon(true);                          thread.setName("com.alibaba.nacos.naming.health");                          return thread;                      }                  });      }  ​      public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {          task.setStartTime(System.currentTimeMillis());          return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS);      }  ​      public static void scheduleCheck(ClientBeatCheckTask task) {          futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));      }  ​      public static void cancelCheck(ClientBeatCheckTask task) {          ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());          if (scheduledFuture == null) {              return;          }          try {              scheduledFuture.cancel(true);          } catch (Exception e) {              Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e);          }      }  ​  ​      public static ScheduledFuture<?> scheduleNow(Runnable task) {          return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);      }  }
  • HealthCheckReactor在static代码块创建了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,并提供了Runnable的scheduleNow方法

小结

  • Service的processClientBeat方法会创建ClientBeatProcessor,并使用HealthCheckReactor进行调度
  • ClientBeatProcessor实现了Runnable方法,它会遍历instances更新指定ip及port的instance的lastBeat时间;同时对于非marked且healthy为false的instance更新其healthy为true并通过getPushService().serviceChanged发布变更事件
  • HealthCheckReactor在static代码块创建了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,并提供了Runnable的scheduleNow方法

doc