聊聊nacos的DistroConsistencyServiceImpl

  • 2019 年 10 月 4 日
  • 筆記

本文主要研究一下nacos的DistroConsistencyServiceImpl

ConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java

public interface ConsistencyService {        /**       * Put a data related to a key to Nacos cluster       *       * @param key   key of data, this key should be globally unique       * @param value value of data       * @throws NacosException       * @see       */      void put(String key, Record value) throws NacosException;        /**       * Remove a data from Nacos cluster       *       * @param key key of data       * @throws NacosException       */      void remove(String key) throws NacosException;        /**       * Get a data from Nacos cluster       *       * @param key key of data       * @return data related to the key       * @throws NacosException       */      Datum get(String key) throws NacosException;        /**       * Listen for changes of a data       *       * @param key      key of data       * @param listener callback of data change       * @throws NacosException       */      void listen(String key, RecordListener listener) throws NacosException;        /**       * Cancel listening of a data       *       * @param key      key of data       * @param listener callback of data change       * @throws NacosException       */      void unlisten(String key, RecordListener listener) throws NacosException;        /**       * Tell the status of this consistency service       *       * @return true if available       */      boolean isAvailable();  }
  • ConsistencyService定义了put、remove、get、listen、unlisten、isAvailable方法

EphemeralConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java

public interface EphemeralConsistencyService extends ConsistencyService {  }
  • EphemeralConsistencyService接口继承了ConsistencyService接口

DistroConsistencyServiceImpl

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")  public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {        private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {          @Override          public Thread newThread(Runnable r) {              Thread t = new Thread(r);                t.setDaemon(true);              t.setName("com.alibaba.nacos.naming.distro.notifier");                return t;          }      });        @Autowired      private DistroMapper distroMapper;        @Autowired      private DataStore dataStore;        @Autowired      private TaskDispatcher taskDispatcher;        @Autowired      private DataSyncer dataSyncer;        @Autowired      private Serializer serializer;        @Autowired      private ServerListManager serverListManager;        @Autowired      private SwitchDomain switchDomain;        @Autowired      private GlobalConfig globalConfig;        private boolean initialized = false;        public volatile Notifier notifier = new Notifier();        private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();        private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);        @PostConstruct      public void init() {          GlobalExecutor.submit(new Runnable() {              @Override              public void run() {                  try {                      load();                  } catch (Exception e) {                      Loggers.DISTRO.error("load data failed.", e);                  }              }          });            executor.submit(notifier);      }        public void load() throws Exception {          if (SystemUtils.STANDALONE_MODE) {              initialized = true;              return;          }          // size = 1 means only myself in the list, we need at least one another server alive:          while (serverListManager.getHealthyServers().size() <= 1) {              Thread.sleep(1000L);              Loggers.DISTRO.info("waiting server list init...");          }            for (Server server : serverListManager.getHealthyServers()) {              if (NetUtils.localServer().equals(server.getKey())) {                  continue;              }              if (Loggers.DISTRO.isDebugEnabled()) {                  Loggers.DISTRO.debug("sync from " + server);              }              // try sync data from remote server:              if (syncAllDataFromRemote(server)) {                  initialized = true;                  return;              }          }      }        //......        public boolean syncAllDataFromRemote(Server server) {            try {              byte[] data = NamingProxy.getAllData(server.getKey());              processData(data);              return true;          } catch (Exception e) {              Loggers.DISTRO.error("sync full data from " + server + " failed!", e);              return false;          }      }        public void processData(byte[] data) throws Exception {          if (data.length > 0) {              Map<String, Datum<Instances>> datumMap =                  serializer.deserializeMap(data, Instances.class);                  for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {                  dataStore.put(entry.getKey(), entry.getValue());                    if (!listeners.containsKey(entry.getKey())) {                      // pretty sure the service not exist:                      if (switchDomain.isDefaultInstanceEphemeral()) {                          // create empty service                          Loggers.DISTRO.info("creating service {}", entry.getKey());                          Service service = new Service();                          String serviceName = KeyBuilder.getServiceName(entry.getKey());                          String namespaceId = KeyBuilder.getNamespace(entry.getKey());                          service.setName(serviceName);                          service.setNamespaceId(namespaceId);                          service.setGroupName(Constants.DEFAULT_GROUP);                          // now validate the service. if failed, exception will be thrown                          service.setLastModifiedMillis(System.currentTimeMillis());                          service.recalculateChecksum();                          listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)                              .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);                      }                  }              }                for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {                    if (!listeners.containsKey(entry.getKey())) {                      // Should not happen:                      Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());                      continue;                  }                    try {                      for (RecordListener listener : listeners.get(entry.getKey())) {                          listener.onChange(entry.getKey(), entry.getValue().value);                      }                  } catch (Exception e) {                      Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);                      continue;                  }                    // Update data store if listener executed successfully:                  dataStore.put(entry.getKey(), entry.getValue());              }          }      }        //......        @Override      public void put(String key, Record value) throws NacosException {          onPut(key, value);          taskDispatcher.addTask(key);      }        @Override      public void remove(String key) throws NacosException {          onRemove(key);          listeners.remove(key);      }        @Override      public Datum get(String key) throws NacosException {          return dataStore.get(key);      }        //......        @Override      public void listen(String key, RecordListener listener) throws NacosException {          if (!listeners.containsKey(key)) {              listeners.put(key, new CopyOnWriteArrayList<>());          }            if (listeners.get(key).contains(listener)) {              return;          }            listeners.get(key).add(listener);      }        @Override      public void unlisten(String key, RecordListener listener) throws NacosException {          if (!listeners.containsKey(key)) {              return;          }          for (RecordListener recordListener : listeners.get(key)) {              if (recordListener.equals(listener)) {                  listeners.get(key).remove(listener);                  break;              }          }      }        @Override      public boolean isAvailable() {          return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());      }        //......  }
  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口
  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier
  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

Notifier

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);            private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);            public void addTask(String datumKey, ApplyAction action) {                if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {                  return;              }              if (action == ApplyAction.CHANGE) {                  services.put(datumKey, StringUtils.EMPTY);              }              tasks.add(Pair.with(datumKey, action));          }            public int getTaskSize() {              return tasks.size();          }            @Override          public void run() {              Loggers.DISTRO.info("distro notifier started");                while (true) {                  try {                        Pair pair = tasks.take();                        if (pair == null) {                          continue;                      }                        String datumKey = (String) pair.getValue0();                      ApplyAction action = (ApplyAction) pair.getValue1();                        services.remove(datumKey);                        int count = 0;                        if (!listeners.containsKey(datumKey)) {                          continue;                      }                        for (RecordListener listener : listeners.get(datumKey)) {                            count++;                            try {                              if (action == ApplyAction.CHANGE) {                                  listener.onChange(datumKey, dataStore.get(datumKey).value);                                  continue;                              }                                if (action == ApplyAction.DELETE) {                                  listener.onDelete(datumKey);                                  continue;                              }                          } catch (Throwable e) {                              Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                          }                      }                        if (Loggers.DISTRO.isDebugEnabled()) {                          Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                              datumKey, count, action.name());                      }                  } catch (Throwable e) {                      Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                  }              }          }      }
  • Notifier实现了Runnable接口,其run方法会从LinkedBlockingQueue取task,然后挨个执行listener回调

小结

  • DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口
  • 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier
  • 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断

doc

  • DistroConsistencyServiceImpl