聊聊nacos ServiceManager的UpdatedServiceProcessor

  • 2019 年 10 月 4 日
  • 筆記

本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component  @DependsOn("nacosApplicationContext")  public class ServiceManager implements RecordListener<Service> {        /**       * Map<namespace, Map<group::serviceName, Service>>       */      private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();        private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);        private Synchronizer synchronizer = new ServiceStatusSynchronizer();        private final Lock lock = new ReentrantLock();        @Resource(name = "consistencyDelegate")      private ConsistencyService consistencyService;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private DistroMapper distroMapper;        @Autowired      private ServerListManager serverListManager;        @Autowired      private PushService pushService;        private final Object putServiceLock = new Object();        @PostConstruct      public void init() {            UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);            UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());            try {              Loggers.SRV_LOG.info("listen for service meta change");              consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);          } catch (NacosException e) {              Loggers.SRV_LOG.error("listen for service meta change failed!");          }      }        //......  }
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

UpdatedServiceProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class UpdatedServiceProcessor implements Runnable {          //get changed service from other server asynchronously          @Override          public void run() {              ServiceKey serviceKey = null;                try {                  while (true) {                      try {                          serviceKey = toBeUpdatedServicesQueue.take();                      } catch (Exception e) {                          Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");                      }                        if (serviceKey == null) {                          continue;                      }                      GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));                  }              } catch (Exception e) {                  Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);              }          }      }
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

ServiceUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

    private class ServiceUpdater implements Runnable {            String namespaceId;          String serviceName;          String serverIP;            public ServiceUpdater(ServiceKey serviceKey) {              this.namespaceId = serviceKey.getNamespaceId();              this.serviceName = serviceKey.getServiceName();              this.serverIP = serviceKey.getServerIP();          }            @Override          public void run() {              try {                  updatedHealthStatus(namespaceId, serviceName, serverIP);              } catch (Exception e) {                  Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",                      serviceName, serverIP, e);              }          }      }
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

ServiceManager.updatedHealthStatus

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component  @DependsOn("nacosApplicationContext")  public class ServiceManager implements RecordListener<Service> {        /**       * Map<namespace, Map<group::serviceName, Service>>       */      private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();        private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);        private Synchronizer synchronizer = new ServiceStatusSynchronizer();        private final Lock lock = new ReentrantLock();        @Resource(name = "consistencyDelegate")      private ConsistencyService consistencyService;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private DistroMapper distroMapper;        @Autowired      private ServerListManager serverListManager;        @Autowired      private PushService pushService;        private final Object putServiceLock = new Object();        //......        public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {          Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));          JSONObject serviceJson = JSON.parseObject(msg.getData());            JSONArray ipList = serviceJson.getJSONArray("ips");          Map<String, String> ipsMap = new HashMap<>(ipList.size());          for (int i = 0; i < ipList.size(); i++) {                String ip = ipList.getString(i);              String[] strings = ip.split("_");              ipsMap.put(strings[0], strings[1]);          }            Service service = getService(namespaceId, serviceName);            if (service == null) {              return;          }            boolean changed = false;            List<Instance> instances = service.allIPs();          for (Instance instance : instances) {                boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));              if (valid != instance.isHealthy()) {                  changed = true;                  instance.setHealthy(valid);                  Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",                      serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"),                      instance.getIp(), instance.getPort(), instance.getClusterName());              }          }            if (changed) {              pushService.serviceChanged(service);          }            StringBuilder stringBuilder = new StringBuilder();          List<Instance> allIps = service.allIPs();          for (Instance instance : allIps) {              stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");          }            if (changed && Loggers.EVT_LOG.isDebugEnabled()) {              Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}",                  service.getNamespaceId(), service.getName(), stringBuilder.toString());          }        }        //......  }
  • updatedHealthStatus方法会从synchronizer获取msg,组装ipsMap,之后通过service.allIPs()获取instances信息,然后遍历instances从ipsMap获取实例的valid状态,如果与instance的isHealthy()对不上则标记为changed,更新instance的healthy;对于changed的则通过pushService.serviceChanged(service)发布事件,最后打印日志

小结

  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

doc

  • ServiceManager