聊聊nacos ServiceManager的removeInstance

  • 2019 年 10 月 4 日
  • 筆記

本文主要研究一下nacos ServiceManager的removeInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component  @DependsOn("nacosApplicationContext")  public class ServiceManager implements RecordListener<Service> {        /**       * Map<namespace, Map<group::serviceName, Service>>       */      private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();        private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);        private Synchronizer synchronizer = new ServiceStatusSynchronizer();        private final Lock lock = new ReentrantLock();        @Resource(name = "consistencyDelegate")      private ConsistencyService consistencyService;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private DistroMapper distroMapper;        @Autowired      private ServerListManager serverListManager;        @Autowired      private PushService pushService;        private final Object putServiceLock = new Object();        //......        public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {          Service service = getService(namespaceId, serviceName);          removeInstance(namespaceId, serviceName, ephemeral, service, ips);      }        public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);            List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);            Instances instances = new Instances();          instances.setInstanceList(instanceList);            consistencyService.put(key, instances);      }        public List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {          return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);      }        public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {            Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));            Map<String, Instance> oldInstanceMap = new HashMap<>(16);          List<Instance> currentIPs = service.allIPs(ephemeral);          Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());            for (Instance instance : currentIPs) {              map.put(instance.toIPAddr(), instance);          }          if (datum != null) {              oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);          }            // use HashMap for deep copy:          HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());          instanceMap.putAll(oldInstanceMap);            for (Instance instance : ips) {              if (!service.getClusterMap().containsKey(instance.getClusterName())) {                  Cluster cluster = new Cluster(instance.getClusterName(), service);                  cluster.init();                  service.getClusterMap().put(instance.getClusterName(), cluster);                  Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                      instance.getClusterName(), instance.toJSON());              }                if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {                  instanceMap.remove(instance.getDatumKey());              } else {                  instanceMap.put(instance.getDatumKey(), instance);              }            }            if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {              throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "                  + JSON.toJSONString(instanceMap.values()));          }            return new ArrayList<>(instanceMap.values());      }        //......  }
  • removeInstance方法通過substractIpAddresses獲取移除執行instances之後的instanceList,然後通過consistencyService.put方法進行更新;substractIpAddresses方法執行updateIpAddresses方法,其action參數值為UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE,它先使用service.allIPs(ephemeral)獲取instance列表,針對remove動作會根據instance.getDatumKey()將其從instanceMap中移除

Service.allIPs

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {        private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\.:_-]+";        @JSONField(serialize = false)      private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);        private String token;      private List<String> owners = new ArrayList<>();      private Boolean resetWeight = false;      private Boolean enabled = true;      private Selector selector = new NoneSelector();      private String namespaceId;        /**       * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.       */      private long ipDeleteTimeout = 30 * 1000;        private volatile long lastModifiedMillis = 0L;        private volatile String checksum;        /**       * TODO set customized push expire time:       */      private long pushCacheMillis = 0L;        private Map<String, Cluster> clusterMap = new HashMap<>();        //......        public List<Instance> allIPs(boolean ephemeral) {          List<Instance> allIPs = new ArrayList<>();          for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {              allIPs.addAll(entry.getValue().allIPs(ephemeral));          }            return allIPs;      }        //......  }
  • Service的allIPs方法會遍歷clusterMap,然後通過Cluster.allIPs(ephemeral)收集Instance

Cluster.allIPs

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Cluster.java

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {        private static final String CLUSTER_NAME_SYNTAX = "[0-9a-zA-Z-]+";      /**       * a addition for same site routing, can group multiple sites into a region, like Hangzhou, Shanghai, etc.       */      private String sitegroup = StringUtils.EMPTY;        private int defCkport = 80;        private int defIPPort = -1;        @JSONField(serialize = false)      private HealthCheckTask checkTask;        @JSONField(serialize = false)      private Set<Instance> persistentInstances = new HashSet<>();        @JSONField(serialize = false)      private Set<Instance> ephemeralInstances = new HashSet<>();        @JSONField(serialize = false)      private Service service;        @JSONField(serialize = false)      private volatile boolean inited = false;        private Map<String, String> metadata = new ConcurrentHashMap<>();        //......        public List<Instance> allIPs(boolean ephemeral) {          return ephemeral ? new ArrayList<>(ephemeralInstances) : new ArrayList<>(persistentInstances);      }        //......  }
  • Cluster的allIPs方法根據ephemeral來返回ephemeralInstances或者persistentInstances

小結

ServiceManager的removeInstance方法通過substractIpAddresses獲取移除執行instances之後的instanceList,然後通過consistencyService.put方法進行更新;substractIpAddresses方法執行updateIpAddresses方法,其action參數值為UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE,它先使用service.allIPs(ephemeral)獲取instance列表,針對remove動作會根據instance.getDatumKey()將其從instanceMap中移除

doc

  • ServiceManager