­

聊聊nacos的HealthCheckCommon

  • 2019 年 10 月 6 日
  • 筆記

本文主要研究一下nacos的HealthCheckCommon

HealthCheckCommon

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

@Component  public class HealthCheckCommon {  ​      @Autowired      private DistroMapper distroMapper;  ​      @Autowired      private SwitchDomain switchDomain;  ​      @Autowired      private ServerListManager serverListManager;  ​      @Autowired      private PushService pushService;  ​      private static LinkedBlockingDeque<HealthCheckResult> healthCheckResults = new LinkedBlockingDeque<>(1024 * 128);  ​      private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {          @Override          public Thread newThread(Runnable r) {              Thread thread = new Thread(r);              thread.setDaemon(true);              thread.setName("com.taobao.health-check.notifier");              return thread;          }      });  ​  ​      public void init() {          executorService.schedule(new Runnable() {              @Override              public void run() {                  List list = Arrays.asList(healthCheckResults.toArray());                  healthCheckResults.clear();  ​                  List<Server> sameSiteServers = serverListManager.getServers();  ​                  if (sameSiteServers == null || sameSiteServers.size() <= 0) {                      return;                  }  ​                  for (Server server : sameSiteServers) {                      if (server.getKey().equals(NetUtils.localServer())) {                          continue;                      }                      Map<String, String> params = new HashMap<>(10);                      params.put("result", JSON.toJSONString(list));                      if (Loggers.SRV_LOG.isDebugEnabled()) {                          Loggers.SRV_LOG.debug("[HEALTH-SYNC] server: {}, healthCheckResults: {}",                              server, JSON.toJSONString(list));                      }  ​                      HttpClient.HttpResult httpResult = HttpClient.httpPost("http://" + server.getKey()                          + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT                          + "/api/healthCheckResult", null, params);  ​                      if (httpResult.code != HttpURLConnection.HTTP_OK) {                          Loggers.EVT_LOG.warn("[HEALTH-CHECK-SYNC] failed to send result to {}, result: {}",                              server, JSON.toJSONString(list));                      }  ​                  }  ​              }          }, 500, TimeUnit.MILLISECONDS);      }  ​      //......  ​      public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) {          //......      }  ​      public void checkOK(Instance ip, HealthCheckTask task, String msg) {          //......      }  ​      public void checkFail(Instance ip, HealthCheckTask task, String msg) {          //......      }  ​      public void checkFailNow(Instance ip, HealthCheckTask task, String msg) {          //......      }  ​      //......  }
  • HealthCheckCommon的init方法注册了一个延时任务,往其他server同步healthCheckResults;它主要提供了reEvaluateCheckRT、checkOK、checkFail、checkFailNow方法

reEvaluateCheckRT

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) {          task.setCheckRTLast(checkRT);  ​          if (checkRT > task.getCheckRTWorst()) {              task.setCheckRTWorst(checkRT);          }  ​          if (checkRT < task.getCheckRTBest()) {              task.setCheckRTBest(checkRT);          }  ​          checkRT = (long) ((params.getFactor() * task.getCheckRTNormalized()) + (1 - params.getFactor()) * checkRT);  ​          if (checkRT > params.getMax()) {              checkRT = params.getMax();          }  ​          if (checkRT < params.getMin()) {              checkRT = params.getMin();          }  ​          task.setCheckRTNormalized(checkRT);      }
  • reEvaluateCheckRT方法首先更新checkRTLast,然后判断是否更新checkRTWorst、checkRTBest,之后根据factor及checkRTNormalized参数重置checkRT,最后更新checkRTNormalized

checkOK

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkOK(Instance ip, HealthCheckTask task, String msg) {          Cluster cluster = task.getCluster();  ​          try {              if (!ip.isHealthy() || !ip.isMockValid()) {                  if (ip.getOKCount().incrementAndGet() >= switchDomain.getCheckTimes()) {                      if (distroMapper.responsible(cluster, ip)) {                          ip.setHealthy(true);                          ip.setMockValid(true);  ​                          Service service = cluster.getService();                          service.setLastModifiedMillis(System.currentTimeMillis());                          pushService.serviceChanged(service);                          addResult(new HealthCheckResult(service.getName(), ip));  ​                          Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",                              cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                      } else {                          if (!ip.isMockValid()) {                              ip.setMockValid(true);                              Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",                                  cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                          }                      }                  } else {                      Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {}, msg: {}",                          cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getOKCount(), msg);                  }              }          } catch (Throwable t) {              Loggers.SRV_LOG.error("[CHECK-OK] error when close check task.", t);          }  ​          ip.getFailCount().set(0);          ip.setBeingChecked(false);      }
  • checkOK对于非healthy或者mockValid的instance会设置其为healthy及mockValid,然后通过pushService.serviceChanged发布变更事件,并添加HealthCheckResult到healthCheckResults中

checkFail

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkFail(Instance ip, HealthCheckTask task, String msg) {          Cluster cluster = task.getCluster();  ​          try {              if (ip.isHealthy() || ip.isMockValid()) {                  if (ip.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) {                      if (distroMapper.responsible(cluster, ip)) {                          ip.setHealthy(false);                          ip.setMockValid(false);  ​                          Service service = cluster.getService();                          service.setLastModifiedMillis(System.currentTimeMillis());                          addResult(new HealthCheckResult(service.getName(), ip));  ​                          pushService.serviceChanged(service);  ​                          Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",                              cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                      } else {                          Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",                              cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                      }  ​                  } else {                      Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {}, msg: {}",                          cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getFailCount(), msg);                  }              }          } catch (Throwable t) {              Loggers.SRV_LOG.error("[CHECK-FAIL] error when close check task.", t);          }  ​          ip.getOKCount().set(0);  ​          ip.setBeingChecked(false);      }
  • checkFail对于healthy或者mockValid的instance会设置其healthy及mockValid为false,然后通过pushService.serviceChanged发布变更事件,并添加HealthCheckResult到healthCheckResults中

checkFailNow

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkFailNow(Instance ip, HealthCheckTask task, String msg) {          Cluster cluster = task.getCluster();          try {              if (ip.isHealthy() || ip.isMockValid()) {                  if (distroMapper.responsible(cluster, ip)) {                      ip.setHealthy(false);                      ip.setMockValid(false);  ​                      Service service = cluster.getService();                      service.setLastModifiedMillis(System.currentTimeMillis());  ​                      pushService.serviceChanged(service);                      addResult(new HealthCheckResult(service.getName(), ip));  ​                      Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",                          cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                  } else {                      if (ip.isMockValid()) {                          ip.setMockValid(false);                          Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",                              cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg);                      }  ​                  }              }          } catch (Throwable t) {              Loggers.SRV_LOG.error("[CHECK-FAIL-NOW] error when close check task.", t);          }  ​          ip.getOKCount().set(0);          ip.setBeingChecked(false);      }
  • checkFailNow对于healthy或者mockValid的instance会设置其healthy及mockValid为false,然后通过pushService.serviceChanged发布变更事件,并添加HealthCheckResult到healthCheckResults中;与checkFail不同的是它对于非自己负责的instance会立马标记mockVlid为false

小结

HealthCheckCommon的init方法注册了一个延时任务,往其他server同步healthCheckResults;它主要提供了reEvaluateCheckRT、checkOK、checkFail、checkFailNow方法

doc