聊聊nacos的ServiceReporter

  • 2019 年 10 月 4 日
  • 筆記

本文主要研究一下nacos的ServiceReporter

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component  @DependsOn("nacosApplicationContext")  public class ServiceManager implements RecordListener<Service> {  ​      /**       * Map<namespace, Map<group::serviceName, Service>>       */      private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();  ​      private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);  ​      private Synchronizer synchronizer = new ServiceStatusSynchronizer();  ​      private final Lock lock = new ReentrantLock();  ​      @Resource(name = "consistencyDelegate")      private ConsistencyService consistencyService;  ​      @Autowired      private SwitchDomain switchDomain;  ​      @Autowired      private DistroMapper distroMapper;  ​      @Autowired      private ServerListManager serverListManager;  ​      @Autowired      private PushService pushService;  ​      private final Object putServiceLock = new Object();  ​      @PostConstruct      public void init() {  ​          UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);  ​          UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());  ​          try {              Loggers.SRV_LOG.info("listen for service meta change");              consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);          } catch (NacosException e) {              Loggers.SRV_LOG.error("listen for service meta change failed!");          }      }  ​      //......  }
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR注册了ServiceReporter

ServiceReporter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceReporter implements Runnable {  ​          @Override          public void run() {              try {  ​                  Map<String, Set<String>> allServiceNames = getAllServiceNames();  ​                  if (allServiceNames.size() <= 0) {                      //ignore                      return;                  }  ​                  for (String namespaceId : allServiceNames.keySet()) {  ​                      ServiceChecksum checksum = new ServiceChecksum(namespaceId);  ​                      for (String serviceName : allServiceNames.get(namespaceId)) {                          if (!distroMapper.responsible(serviceName)) {                              continue;                          }  ​                          Service service = getService(namespaceId, serviceName);  ​                          if (service == null) {                              continue;                          }  ​                          service.recalculateChecksum();  ​                          checksum.addItem(serviceName, service.getChecksum());                      }  ​                      Message msg = new Message();  ​                      msg.setData(JSON.toJSONString(checksum));  ​                      List<Server> sameSiteServers = serverListManager.getServers();  ​                      if (sameSiteServers == null || sameSiteServers.size() <= 0) {                          return;                      }  ​                      for (Server server : sameSiteServers) {                          if (server.getKey().equals(NetUtils.localServer())) {                              continue;                          }                          synchronizer.send(server.getKey(), msg);                      }                  }              } catch (Exception e) {                  Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);              } finally {                  UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);              }          }      }
  • ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

ServiceStatusSynchronizer

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer.java

public class ServiceStatusSynchronizer implements Synchronizer {      @Override      public void send(final String serverIP, Message msg) {          if(serverIP == null) {              return;          }  ​          Map<String,String> params = new HashMap<String, String>(10);  ​          params.put("statuses", msg.getData());          params.put("clientIP", NetUtils.localServer());  ​  ​          String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +                  UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";  ​          if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {              url = "http://" + serverIP + RunningConfig.getContextPath() +                      UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";          }  ​          try {              HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {                  @Override                  public Integer onCompleted(Response response) throws Exception {                      if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                          Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);  ​                          return 1;                      }                      return 0;                  }              });          } catch (Exception e) {              Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);          }  ​      }  ​      @Override      public Message get(String serverIP, String key) {          if(serverIP == null) {              return null;          }  ​          Map<String,String> params = new HashMap<>(10);  ​          params.put("key", key);  ​          String result;          try {              if (Loggers.SRV_LOG.isDebugEnabled()) {                  Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);              }              result = NamingProxy.reqAPI(RunningConfig.getContextPath()                  + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);          } catch (Exception e) {              Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);              return null;          }  ​          if(result == null || result.equals(StringUtils.EMPTY)) {              return null;          }  ​          Message msg = new Message();          msg.setData(result);  ​          return msg;      }  }
  • ServiceStatusSynchronizer实现了Synchronizer接口,其send方法会异步执行post请求,将statuses通知到目标server

小结

ServiceReporter实现Runnable接口,其run方法会遍历allServiceNames,取出distroMapper.responsible的serviceName,重新计算recalculateChecksum,然后添加到ServiceChecksum中,构造Message,遍历sameSiteServers使用synchronizer.send发送该消息;最后往UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR重新注册ServiceReporter

doc