聊聊nacos的TcpSuperSenseProcessor

  • 2019 年 10 月 6 日
  • 筆記

本文主要研究一下nacos的TcpSuperSenseProcessor

TcpSuperSenseProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java

@Component  public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {  ​      @Autowired      private HealthCheckCommon healthCheckCommon;  ​      @Autowired      private SwitchDomain switchDomain;  ​      public static final int CONNECT_TIMEOUT_MS = 500;  ​      private Map<String, BeatKey> keyMap = new ConcurrentHashMap<>();  ​      private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>();  ​      /**       * this value has been carefully tuned, do not modify unless you're confident       */      private static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ?          1 : Runtime.getRuntime().availableProcessors() / 2;  ​      /**       * because some hosts doesn't support keep-alive connections, disabled temporarily       */      private static final long TCP_KEEP_ALIVE_MILLIS = 0;  ​      private static ScheduledExecutorService TCP_CHECK_EXECUTOR          = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {          @Override          public Thread newThread(Runnable r) {              Thread t = new Thread(r);              t.setName("nacos.naming.tcp.check.worker");              t.setDaemon(true);              return t;          }      });  ​      private static ScheduledExecutorService NIO_EXECUTOR          = Executors.newScheduledThreadPool(NIO_THREAD_COUNT,          new ThreadFactory() {              @Override              public Thread newThread(Runnable r) {                  Thread thread = new Thread(r);                  thread.setDaemon(true);                  thread.setName("nacos.supersense.checker");                  return thread;              }          }      );  ​      private Selector selector;  ​      public TcpSuperSenseProcessor() {          try {              selector = Selector.open();  ​              TCP_CHECK_EXECUTOR.submit(this);  ​          } catch (Exception e) {              throw new IllegalStateException("Error while initializing SuperSense(TM).");          }      }  ​      @Override      public void process(HealthCheckTask task) {          List<Instance> ips = task.getCluster().allIPs(false);  ​          if (CollectionUtils.isEmpty(ips)) {              return;          }  ​          for (Instance ip : ips) {  ​              if (ip.isMarked()) {                  if (SRV_LOG.isDebugEnabled()) {                      SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());                  }                  continue;              }  ​              if (!ip.markChecking()) {                  SRV_LOG.warn("tcp check started before last one finished, service: "                      + task.getCluster().getService().getName() + ":"                      + task.getCluster().getName() + ":"                      + ip.getIp() + ":"                      + ip.getPort());  ​                  healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getTcpHealthParams());                  continue;              }  ​              Beat beat = new Beat(ip, task);              taskQueue.add(beat);              MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();          }      }  ​      private void processTask() throws Exception {          Collection<Callable<Void>> tasks = new LinkedList<>();          do {              Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);              if (beat == null) {                  return;              }  ​              tasks.add(new TaskProcessor(beat));          } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);  ​          for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) {              f.get();          }      }  ​      @Override      public void run() {          while (true) {              try {                  processTask();  ​                  int readyCount = selector.selectNow();                  if (readyCount <= 0) {                      continue;                  }  ​                  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();                  while (iter.hasNext()) {                      SelectionKey key = iter.next();                      iter.remove();  ​                      NIO_EXECUTOR.execute(new PostProcessor(key));                  }              } catch (Throwable e) {                  SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);              }          }      }  ​      //......  ​      @Override      public String getType() {          return "TCP";      }  ​  }
  • TcpSuperSenseProcessor实现了HealthCheckProcessor、Runnable接口
  • 其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建Beat添加到taskQueue
  • 其构造器会往TCP_CHECK_EXECUTOR注册自己的Runnable,其run方法不断执行processTask方法,然后从selector中select key然后创建PostProcessor提交给NIO_EXECUTOR;processTask方法会从taskQueue取出Beat,然后创建TaskProcessor添加到tasks,当tasks大小达到一定值则使用NIO_EXECUTOR.invokeAll(tasks)批量异步执行

PostProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java

    public class PostProcessor implements Runnable {          SelectionKey key;  ​          public PostProcessor(SelectionKey key) {              this.key = key;          }  ​          @Override          public void run() {              Beat beat = (Beat) key.attachment();              SocketChannel channel = (SocketChannel) key.channel();              try {                  if (!beat.isHealthy()) {                      //invalid beat means this server is no longer responsible for the current service                      key.cancel();                      key.channel().close();  ​                      beat.finishCheck();                      return;                  }  ​                  if (key.isValid() && key.isConnectable()) {                      //connected                      channel.finishConnect();                      beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+");                  }  ​                  if (key.isValid() && key.isReadable()) {                      //disconnected                      ByteBuffer buffer = ByteBuffer.allocate(128);                      if (channel.read(buffer) == -1) {                          key.cancel();                          key.channel().close();                      } else {                          // not terminate request, ignore                      }                  }              } catch (ConnectException e) {                  // unable to connect, possibly port not opened                  beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage());              } catch (Exception e) {                  beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());  ​                  try {                      key.cancel();                      key.channel().close();                  } catch (Exception ignore) {                  }              }          }      }
  • PostProcessor实现了Runnable接口,其run方法主要是执行beat.finishCheck

TaskProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java

    private class TaskProcessor implements Callable<Void> {  ​          private static final int MAX_WAIT_TIME_MILLISECONDS = 500;          Beat beat;  ​          public TaskProcessor(Beat beat) {              this.beat = beat;          }  ​          @Override          public Void call() {              long waited = System.currentTimeMillis() - beat.getStartTime();              if (waited > MAX_WAIT_TIME_MILLISECONDS) {                  Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");              }  ​              SocketChannel channel = null;              try {                  Instance instance = beat.getIp();                  Cluster cluster = beat.getTask().getCluster();  ​                  BeatKey beatKey = keyMap.get(beat.toString());                  if (beatKey != null && beatKey.key.isValid()) {                      if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {                          instance.setBeingChecked(false);                          return null;                      }  ​                      beatKey.key.cancel();                      beatKey.key.channel().close();                  }  ​                  channel = SocketChannel.open();                  channel.configureBlocking(false);                  // only by setting this can we make the socket close event asynchronous                  channel.socket().setSoLinger(false, -1);                  channel.socket().setReuseAddress(true);                  channel.socket().setKeepAlive(true);                  channel.socket().setTcpNoDelay(true);  ​                  int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();                  channel.connect(new InetSocketAddress(instance.getIp(), port));  ​                  SelectionKey key                      = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);                  key.attach(beat);                  keyMap.put(beat.toString(), new BeatKey(key));  ​                  beat.setStartTime(System.currentTimeMillis());  ​                  NIO_EXECUTOR.schedule(new TimeOutTask(key),                      CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);              } catch (Exception e) {                  beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());  ​                  if (channel != null) {                      try {                          channel.close();                      } catch (Exception ignore) {                      }                  }              }  ​              return null;          }      }
  • TaskProcessor实现了Callable<Void>接口,其call方法主要是对目标instance执行beat操作,同时它会往NIO_EXECUTOR注册TimeOutTask的延时任务

TimeOutTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java

    private static class TimeOutTask implements Runnable {          SelectionKey key;  ​          public TimeOutTask(SelectionKey key) {              this.key = key;          }  ​          @Override          public void run() {              if (key != null && key.isValid()) {                  SocketChannel channel = (SocketChannel) key.channel();                  Beat beat = (Beat) key.attachment();  ​                  if (channel.isConnected()) {                      return;                  }  ​                  try {                      channel.finishConnect();                  } catch (Exception ignore) {                  }  ​                  try {                      beat.finishCheck(false, false, beat.getTask().getCheckRTNormalized() * 2, "tcp:timeout");                      key.cancel();                      key.channel().close();                  } catch (Exception ignore) {                  }              }          }      }
  • TimeOutTask实现了Runnable方法,其run方法会执行channel.finishConnect(),然后执行beat.finishCheck标记success为false,msg为tcp:timeout

小结

  • TcpSuperSenseProcessor实现了HealthCheckProcessor、Runnable接口
  • 其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建Beat添加到taskQueue
  • 其构造器会往TCP_CHECK_EXECUTOR注册自己的Runnable,其run方法不断执行processTask方法,然后从selector中select key然后创建PostProcessor提交给NIO_EXECUTOR;processTask方法会从taskQueue取出Beat,然后创建TaskProcessor添加到tasks,当tasks大小达到一定值则使用NIO_EXECUTOR.invokeAll(tasks)批量异步执行

doc