聊聊nacos的MysqlHealthCheckProcessor

  • 2019 年 10 月 6 日
  • 筆記

本文主要研究一下nacos的MysqlHealthCheckProcessor

MysqlHealthCheckProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

@Component  public class MysqlHealthCheckProcessor implements HealthCheckProcessor {  ​      @Autowired      private HealthCheckCommon healthCheckCommon;  ​      @Autowired      private SwitchDomain switchDomain;  ​      public static final int CONNECT_TIMEOUT_MS = 500;  ​      private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";      private static final String MYSQL_SLAVE_READONLY = "ON";  ​      private static ConcurrentMap<String, Connection> CONNECTION_POOL              = new ConcurrentHashMap<String, Connection>();  ​      private static ExecutorService EXECUTOR;  ​      static {  ​          int processorCount = Runtime.getRuntime().availableProcessors();          EXECUTOR                  = Executors.newFixedThreadPool(processorCount <= 1 ? 1 : processorCount / 2,                  new ThreadFactory() {                      @Override                      public Thread newThread(Runnable r) {                          Thread thread = new Thread(r);                          thread.setDaemon(true);                          thread.setName("com.nacos.mysql.checker");                          return thread;                      }                  }          );      }  ​      public MysqlHealthCheckProcessor() {      }  ​      @Override      public String getType() {          return "MYSQL";      }  ​      @Override      public void process(HealthCheckTask task) {          List<Instance> ips = task.getCluster().allIPs(false);  ​          SRV_LOG.debug("mysql check, ips:" + ips);          if (CollectionUtils.isEmpty(ips)) {              return;          }  ​          for (Instance ip : ips) {              try {  ​                  if (ip.isMarked()) {                      if (SRV_LOG.isDebugEnabled()) {                          SRV_LOG.debug("mysql check, ip is marked as to skip health check, ip: {}", ip.getIp());                      }                      continue;                  }  ​                  if (!ip.markChecking()) {                      SRV_LOG.warn("mysql check started before last one finished, service: {}:{}:{}",                          task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());  ​                      healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getMysqlHealthParams());                      continue;                  }  ​                  EXECUTOR.execute(new MysqlCheckTask(ip, task));                  MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet();              } catch (Exception e) {                  ip.setCheckRT(switchDomain.getMysqlHealthParams().getMax());                  healthCheckCommon.checkFail(ip, task, "mysql:error:" + e.getMessage());                  healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());              }          }      }  ​      //......  }
  • MysqlHealthCheckProcessor实现了HealthCheckProcessor接口,其getType方法返回的是MYSQL;其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建MysqlCheckTask提交给EXECUTOR执行

MysqlCheckTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

    private class MysqlCheckTask implements Runnable {          private Instance ip;          private HealthCheckTask task;          private long startTime = System.currentTimeMillis();  ​          public MysqlCheckTask(Instance ip, HealthCheckTask task) {              this.ip = ip;              this.task = task;          }  ​          @Override          public void run() {  ​              Statement statement = null;              ResultSet resultSet = null;  ​              try {  ​                  Cluster cluster = task.getCluster();                  String key = cluster.getService().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();                  Connection connection = CONNECTION_POOL.get(key);                  AbstractHealthChecker.Mysql config = (AbstractHealthChecker.Mysql) cluster.getHealthChecker();  ​                  if (connection == null || connection.isClosed()) {                      MysqlDataSource dataSource = new MysqlDataSource();                      dataSource.setConnectTimeout(CONNECT_TIMEOUT_MS);                      dataSource.setSocketTimeout(CONNECT_TIMEOUT_MS);                      dataSource.setUser(config.getUser());                      dataSource.setPassword(config.getPwd());                      dataSource.setLoginTimeout(1);  ​                      dataSource.setServerName(ip.getIp());                      dataSource.setPort(ip.getPort());  ​                      connection = dataSource.getConnection();                      CONNECTION_POOL.put(key, connection);                  }  ​                  statement = connection.createStatement();                  statement.setQueryTimeout(1);  ​                  resultSet = statement.executeQuery(config.getCmd());                  int resultColumnIndex = 2;  ​                  if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {                      resultSet.next();                      if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {                          throw new IllegalStateException("current node is slave!");                      }                  }  ​                  healthCheckCommon.checkOK(ip, task, "mysql:+ok");                  healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task, switchDomain.getMysqlHealthParams());              } catch (SQLException e) {                  // fail immediately                  healthCheckCommon.checkFailNow(ip, task, "mysql:" + e.getMessage());                  healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());              } catch (Throwable t) {                  Throwable cause = t;                  int maxStackDepth = 50;                  for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {                      if (cause instanceof SocketTimeoutException                              || cause instanceof ConnectTimeoutException                              || cause instanceof TimeoutException                              || cause.getCause() instanceof TimeoutException) {  ​                          healthCheckCommon.checkFail(ip, task, "mysql:timeout:" + cause.getMessage());                          healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getMysqlHealthParams());                          return;                      }  ​                      cause = cause.getCause();                  }  ​                  // connection error, probably not reachable                  healthCheckCommon.checkFail(ip, task, "mysql:error:" + t.getMessage());                  healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());              } finally {                  ip.setCheckRT(System.currentTimeMillis() - startTime);                  if (statement != null) {                      try {                          statement.close();                      } catch (SQLException e) {                          Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close statement:" + statement, e);                      }                  }                  if (resultSet != null) {                      try {                          resultSet.close();                      } catch (SQLException e) {                          Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close resultSet:" + resultSet, e);                      }                  }              }          }      }
  • MysqlCheckTask实现了Runnable方法,其run方法会从CONNECTION_POOL获取指定实例的connection,如果connection为null或者是closed的则重新创建MysqlDataSource并getConnection()放入到CONNECTION_POOL;获取到connection之后会执行config.getCmd()指定的查询,然后根据返回结果或异常情况执行healthCheckCommon.checkOK或者healthCheckCommon.checkFailNow、healthCheckCommon.checkFail操作

小结

MysqlHealthCheckProcessor实现了HealthCheckProcessor接口,其getType方法返回的是MYSQL;其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建MysqlCheckTask提交给EXECUTOR执行

doc