聊聊nacos ServiceManager的updateInstance

  • 2019 年 10 月 4 日
  • 笔记

本文主要研究一下nacos ServiceManager的updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component  @DependsOn("nacosApplicationContext")  public class ServiceManager implements RecordListener<Service> {        /**       * Map<namespace, Map<group::serviceName, Service>>       */      private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();        private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);        private Synchronizer synchronizer = new ServiceStatusSynchronizer();        private final Lock lock = new ReentrantLock();        @Resource(name = "consistencyDelegate")      private ConsistencyService consistencyService;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private DistroMapper distroMapper;        @Autowired      private ServerListManager serverListManager;        @Autowired      private PushService pushService;        private final Object putServiceLock = new Object();        //......        public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {            Service service = getService(namespaceId, serviceName);            if (service == null) {              throw new NacosException(NacosException.INVALID_PARAM,                  "service not found, namespace: " + namespaceId + ", service: " + serviceName);          }            if (!service.allIPs().contains(instance)) {              throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);          }            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);      }        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);            Service service = getService(namespaceId, serviceName);            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);            Instances instances = new Instances();          instances.setInstanceList(instanceList);            consistencyService.put(key, instances);      }        public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {          return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);      }        public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {            Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));            Map<String, Instance> oldInstanceMap = new HashMap<>(16);          List<Instance> currentIPs = service.allIPs(ephemeral);          Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());            for (Instance instance : currentIPs) {              map.put(instance.toIPAddr(), instance);          }          if (datum != null) {              oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);          }            // use HashMap for deep copy:          HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());          instanceMap.putAll(oldInstanceMap);            for (Instance instance : ips) {              if (!service.getClusterMap().containsKey(instance.getClusterName())) {                  Cluster cluster = new Cluster(instance.getClusterName(), service);                  cluster.init();                  service.getClusterMap().put(instance.getClusterName(), cluster);                  Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                      instance.getClusterName(), instance.toJSON());              }                if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {                  instanceMap.remove(instance.getDatumKey());              } else {                  instanceMap.put(instance.getDatumKey(), instance);              }            }            if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {              throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "                  + JSON.toJSONString(instanceMap.values()));          }            return new ArrayList<>(instanceMap.values());      }        //......   }
  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法
  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")  public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {        private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {          @Override          public Thread newThread(Runnable r) {              Thread t = new Thread(r);                t.setDaemon(true);              t.setName("com.alibaba.nacos.naming.distro.notifier");                return t;          }      });        @Autowired      private DistroMapper distroMapper;        @Autowired      private DataStore dataStore;        @Autowired      private TaskDispatcher taskDispatcher;        @Autowired      private DataSyncer dataSyncer;        @Autowired      private Serializer serializer;        @Autowired      private ServerListManager serverListManager;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private GlobalConfig globalConfig;        private boolean initialized = false;        public volatile Notifier notifier = new Notifier();        private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();        private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);        //......        public void put(String key, Record value) throws NacosException {          onPut(key, value);          taskDispatcher.addTask(key);      }        public void onPut(String key, Record value) {            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {              Datum<Instances> datum = new Datum<>();              datum.value = (Instances) value;              datum.key = key;              datum.timestamp.incrementAndGet();              dataStore.put(key, datum);          }            if (!listeners.containsKey(key)) {              return;          }            notifier.addTask(key, ApplyAction.CHANGE);      }      //......  }
  • DistroConsistencyServiceImpl的put方法会先执行onPut,然后执行taskDispatcher.addTask(key);onPut在判断key是ephemeralInstanceListKey时会创建一个Datum,递增其timestamp,然后放到dataStore中,最后调用notifier.addTask(key, ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);            private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);            public void addTask(String datumKey, ApplyAction action) {                if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                  return;              }              if (action == ApplyAction.CHANGE) {                  services.put(datumKey, StringUtils.EMPTY);              }              tasks.add(Pair.with(datumKey, action));          }            public int getTaskSize() {              return tasks.size();          }            @Override          public void run() {              Loggers.DISTRO.info("distro notifier started");                while (true) {                  try {                        Pair pair = tasks.take();                        if (pair == null) {                          continue;                      }                        String datumKey = (String) pair.getValue0();                      ApplyAction action = (ApplyAction) pair.getValue1();                        services.remove(datumKey);                        int count = 0;                        if (!listeners.containsKey(datumKey)) {                          continue;                      }                        for (RecordListener listener : listeners.get(datumKey)) {                            count++;                            try {                              if (action == ApplyAction.CHANGE) {                                  listener.onChange(datumKey, dataStore.get(datumKey).value);                                  continue;                              }                                if (action == ApplyAction.DELETE) {                                  listener.onDelete(datumKey);                                  continue;                              }                          } catch (Throwable e) {                              Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                          }                      }                        if (Loggers.DISTRO.isDebugEnabled()) {                          Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                              datumKey, count, action.name());                      }                  } catch (Throwable e) {                      Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                  }              }          }      }
  • Notifier的addTask方法对于action为ApplyAction.CHANGE的且不在services当中的会放入到services当中,最后添加到tasks;run方法会不断从tasks取出数据,执行相应的回调

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Component  public class TaskDispatcher {        @Autowired      private GlobalConfig partitionConfig;        @Autowired      private DataSyncer dataSyncer;        private List<TaskScheduler> taskSchedulerList = new ArrayList<>();        private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();        @PostConstruct      public void init() {          for (int i = 0; i < cpuCoreCount; i++) {              TaskScheduler taskScheduler = new TaskScheduler(i);              taskSchedulerList.add(taskScheduler);              GlobalExecutor.submitTaskDispatch(taskScheduler);          }      }        public void addTask(String key) {          taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);      }        public class TaskScheduler implements Runnable {            private int index;            private int dataSize = 0;            private long lastDispatchTime = 0L;            private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);            public TaskScheduler(int index) {              this.index = index;          }            public void addTask(String key) {              queue.offer(key);          }            public int getIndex() {              return index;          }            @Override          public void run() {                List<String> keys = new ArrayList<>();              while (true) {                    try {                        String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),                          TimeUnit.MILLISECONDS);                        if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                          Loggers.DISTRO.debug("got key: {}", key);                      }                        if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {                          continue;                      }                        if (StringUtils.isBlank(key)) {                          continue;                      }                        if (dataSize == 0) {                          keys = new ArrayList<>();                      }                        keys.add(key);                      dataSize++;                        if (dataSize == partitionConfig.getBatchSyncKeyCount() ||                          (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {                            for (Server member : dataSyncer.getServers()) {                              if (NetUtils.localServer().equals(member.getKey())) {                                  continue;                              }                              SyncTask syncTask = new SyncTask();                              syncTask.setKeys(keys);                              syncTask.setTargetServer(member.getKey());                                if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {                                  Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));                              }                                dataSyncer.submit(syncTask, 0);                          }                          lastDispatchTime = System.currentTimeMillis();                          dataSize = 0;                      }                    } catch (Exception e) {                      Loggers.DISTRO.error("dispatch sync task failed.", e);                  }              }          }      }  }
  • TaskDispatcher的addTask方法会从taskSchedulerList获取指定的TaskScheduler,然后执行其addTask方法;TaskScheduler的addTask方法会往queue中添加数据,而run方法则不断从queue取数据,然后通过dataSyncer执行syncTask

SyncTask

workspace/nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {        private List<String> keys;        private int retryCount;        private long lastExecuteTime;        private String targetServer;        public List<String> getKeys() {          return keys;      }        public void setKeys(List<String> keys) {          this.keys = keys;      }        public int getRetryCount() {          return retryCount;      }        public void setRetryCount(int retryCount) {          this.retryCount = retryCount;      }        public long getLastExecuteTime() {          return lastExecuteTime;      }        public void setLastExecuteTime(long lastExecuteTime) {          this.lastExecuteTime = lastExecuteTime;      }        public String getTargetServer() {          return targetServer;      }        public void setTargetServer(String targetServer) {          this.targetServer = targetServer;      }  }
  • SyncTask包含了keys、targetServer属性,其中targetServer用于告诉DataSyncer该往哪个server执行sync操作

小结

  • updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法
  • addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中

doc

  • ServiceManager