Nacos配置服務原理

  • 2019 年 10 月 3 日
  • 筆記

Nacos Client配置機制

spring boot載入遠程配置

在了解NACOS客戶端配置之前,我們先看看spring boot怎麼樣載入遠程配置的。spring boot提供了載入遠程配置的擴展介面 PropertySourceLocator。下面看個簡單的例子:

實現PropertySourceLocator

public class GreizPropertySourceLocator implements PropertySourceLocator {      @Override      public PropertySource<?> locate(Environment environment) {          // 自定義配置,來源可以從任何地方          Map<String, Object> source = new HashMap<>();          source.put("userName", "Greiz");          source.put("userAge", 18);          return new MapPropertySource(GreizPropertySource.PROPERTY_NAME, source);      }  }

PropertySourceLocator 只有一個介面,我們可以在該介面實現自定義配置的載入,比如從資料庫中獲取配置,或者文件中獲取配置等。

springboot啟動配置類

@Configuration  public class GreizConfigBootstrapConfiguration {      @Bean      public GreizPropertySourceLocator greizPropertySourceLocator() {          return new GreizPropertySourceLocator();      }  }

在META-INF/spring.factories添加啟動指定載入類

org.springframework.cloud.bootstrap.BootstrapConfiguration=  com.greiz.demo.config.GreizConfigBootstrapConfiguration

使用

@Component  public class Greiz {      @Value("${userName}")      private String name;      @Value("${userAge}")      private Integer age;        // 省getter/setter  }

跟本地配置一樣使用。

spring啟動載入遠程配置流程

spring啟動載入流程

在spring啟動prepareContext階段會執行PropertySourceLocator所有實現類載入自定義的配置,最終添加到Environment中管理。

nacos-client

拉取遠程配置

nacos客戶端啟動時載入遠程配置就是用了上面的方式。下面我們根據源碼看一下具體過程。NacosPropertySourceLocator 實現了 PropertySourceLocator,所以spring啟動時會調用locate方法。

public PropertySource<?> locate(Environment env) {     // 1. 創建一個跟遠程打交道的對象NacosConfigService     ConfigService configService = nacosConfigProperties.configServiceInstance();     ... 省略程式碼     // 2. 操作NacosPropertySource對象,下面三個方法最終都會調用該對象build     nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);     // 3.     String name = nacosConfigProperties.getName();     String dataIdPrefix = nacosConfigProperties.getPrefix();     if (StringUtils.isEmpty(dataIdPrefix)) {        dataIdPrefix = name;     }     if (StringUtils.isEmpty(dataIdPrefix)) {        dataIdPrefix = env.getProperty("spring.application.name");     }     // 從遠程獲取的properties會存放到該類,最終放到Environment中     CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);     // 載入公共模組配置     loadSharedConfiguration(composite);     // 載入擴展配置     loadExtConfiguration(composite);     // 載入獨有配置     loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);     return composite;  }

1處 – 創建 ConfigService 對象,是通過反射創建出 NacosConfigService 實例。該類是Nacos Client 跟 Nacos Server 重要的對接者。後面會圍繞該類細講。

2處 – 創建 NacosPropertySourceBuilder 實例,用於構建和快取 NacosPropertySource,刷新時會用到此處快取。

3處 – 載入配置的順序,公共配置 -> 擴展配置 -> 私有配置,如果有相同key的後面的覆蓋前面的。默認的 Data ID 生成規則 ${spring.application.name}.properties。

載入三種配置最終都會調用 NacosPropertySourceBuilder.build() 方法。

NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {     // 載入配置     Properties p = loadNacosData(dataId, group, fileExtension);     NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, propertiesToMap(p), new Date(), isRefreshable);     // 快取nacosPropertySource     NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);     return nacosPropertySource;  }

載入配置後封裝nacosPropertySource,並快取。

主要邏輯在 NacosPropertySourceBuilder.loadNacosData() 中。

private Properties loadNacosData(String dataId, String group, String fileExtension) {      // 獲取配置      String data = configService.getConfig(dataId, group, timeout);      ... 省略程式碼      // .properties擴展名      if (fileExtension.equalsIgnoreCase("properties")) {          Properties properties = new Properties();          properties.load(new StringReader(data));          return properties;      } else if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml"))         {// .yaml或者.yml擴展名        YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();        yamlFactory.setResources(new ByteArrayResource(data.getBytes()));        return yamlFactory.getObject();       }     return EMPTY_PROPERTIES;  }

把遠程獲取到的數據根據擴展名解析成統一的properties。nacos控制台配置支援properties和yaml兩個擴展名。

真正獲取遠程配置的是 NacosConfigService.getConfig(), 調用getConfigInner()。

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {      group = null2defaultGroup(group);      ParamUtils.checkKeyParam(dataId, group);      ConfigResponse cr = new ConfigResponse();      cr.setDataId(dataId);      cr.setTenant(tenant);      cr.setGroup(group);        // 1. 優先使用failvoer配置      String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);      if (content != null) {          cr.setContent(content);          configFilterChainManager.doFilter(null, cr);          content = cr.getContent();          return content;      }        try {          // 2. 伺服器獲取配置          content = worker.getServerConfig(dataId, group, tenant, timeoutMs);          cr.setContent(content);          configFilterChainManager.doFilter(null, cr);          content = cr.getContent();          return content;      } catch (NacosException ioe) {          if (NacosException.NO_RIGHT == ioe.getErrCode()) {              throw ioe;          }      }        // 3. 當伺服器掛了就拿本地快照      content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);      cr.setContent(content);      configFilterChainManager.doFilter(null, cr);      content = cr.getContent();      return content;  }

1處 – 優先從failvoer獲取配置,該文件是怎麼樣產生的,我暫時還不是很清楚,後面搞懂補充。

2處 – 從nacos服務中獲取配置。

3處 – 如果2失敗了就從本地快照文件獲取。該文件由首次讀取遠程配置文件生成,並且之後輪詢配置更新時如果有更新也會對應更新該文件。

訪問服務介面的臟活當然需要一個客戶端工作者ClientWorker,下面是 NacosConfigService.getConfig() 中調用 ClientWorker.getServerConfig()。

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)      throws NacosException {      // 就是這麼簡單http請求獲取的配置      HttpResult result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);    ... 省略程式碼      // 寫本地文件快照      LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);    ...省略程式碼          return result.content;  }

看了上面獲取遠程配置的程式碼是不是想喊出f**k,怎麼這麼簡單!!!是的,用http請求 http://ip:port/v1/cs/configs 介面,跟nacos控制台頁面訪問是一樣的。

到此Nacos Client啟動讀取遠程配置並封裝到Environment結束了。

長輪詢獲取更新

前一小節是對項目啟動時Nacos Client載入遠程配置過程分析,本節將對項目運行中配置改變了Nacos Client是怎麼樣悉知的分析。

前面提到 NacosConfigService 是 Nacos Client 對接 Nacos Server 的橋樑,下面看一下該類在配置更新過程怎麼樣運作的。先看一下 NacosConfigService 的構造方法。

public NacosConfigService(Properties properties) throws NacosException {      ... 省略程式碼      // 初始化 namespace      initNamespace(properties);      // 查詢服務列表變化情況      agent = new MetricsHttpAgent(new ServerHttpAgent(properties));      agent.start();      // 配置更新解決方案在這裡面      worker = new ClientWorker(agent, configFilterChainManager, properties);  }

在構造函數中初始化 encode、namespace、HttpAgent 和 ClientWorker。

HttpAgent 是通過http獲取服務地址列表代理類,維護這服務地址列表和客戶端本地一致。

ClientWorker 是維護服務端配置和客戶端配置一致的工作者。前面初始化獲取遠程配置時也是該對象。

ClientWorker 內部是怎麼樣維護客戶端屬性更新呢?看一下 ClientWorker 構造函數幹了啥。

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {          ...省略程式碼      executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {          ...省略程式碼      });        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {          ...省略程式碼      });        // 每10毫秒檢查一遍配置      executor.scheduleWithFixedDelay(new Runnable() {          @Override          public void run() {              try {                  checkConfigInfo();              } catch (Throwable e) {                  LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);              }          }      }, 1L, 10L, TimeUnit.MILLISECONDS);  }

ClientWorker 構造函數創建了兩個執行緒池。executor 創建了一個定時任務,每10毫秒執行一次 checkConfigInfo(); executorService 作用是什麼我們接著往下看。

public void checkConfigInfo() {      // 分任務 向上取整為批數      int listenerSize = cacheMap.get().size();      int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());      if (longingTaskCount > currentLongingTaskCount) {          for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {              executorService.execute(new LongPollingRunnable(i));          }          currentLongingTaskCount = longingTaskCount;      }  }

以分段方式把任務拆分交給 executorService 執行,默認3000個配置在一個任務中。executor 和 executorService 是不是很像 Netty 中的 boos 和 worker? Reactor 模式,分工明確。

LongPollingRunnable 是 ClientWorker 一個成員類,實現 Runnable 介面。看一下 run() 方法。

public void run() {      List<CacheData> cacheDatas = new ArrayList<CacheData>();      List<String> inInitializingCacheList = new ArrayList<String>();      try {          // 1. 只處理該任務中的配置並且檢查failover配置          for (CacheData cacheData : cacheMap.get().values()) {              if (cacheData.getTaskId() == taskId) {                  cacheDatas.add(cacheData);                  try {                      checkLocalConfig(cacheData);                      if (cacheData.isUseLocalConfigInfo()) {                          cacheData.checkListenerMd5();                      }                  } catch (Exception e) {                      LOGGER.error("get local config info error", e);                  }              }          }                  // 2. 把客戶端的MD5值跟服務端的MD5比較,把不一樣的配置以 "example.properties+DEFAULT_GROUP"方式返回          List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);         // 3. 把有更新的配置重新從服務端拉取配置內容          for (String groupKey : changedGroupKeys) {              String[] key = GroupKey.parseKey(groupKey);              String dataId = key[0];              String group = key[1];              String tenant = null;              if (key.length == 3) {                  tenant = key[2];              }              try {                  String content = getServerConfig(dataId, group, tenant, 3000L);                  CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));                  // 修改客戶端本地值並且重新計算該對象的md5值                  cache.setContent(content);              } catch (NacosException ioe) {                  ...省略程式碼              }          }          for (CacheData cacheData : cacheDatas) {              if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {                  // 4. 根據md5值檢查是否更新,如果更新通知listener                  cacheData.checkListenerMd5();                  cacheData.setInitializing(false);              }          }          inInitializingCacheList.clear();          // 5. 又把this放進執行緒池中,形成一個長輪詢檢查客戶端和服務端配置一致性          executorService.execute(this);      } catch (Throwable e) {          executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);      }  }

1處 – 篩選屬於該任務的配置,並檢查 failover 配置。

2處 – 把配置以"dataId group MD5 tenantrn"拼接後當做參數請求伺服器 http://ip:port/v1/cs/configs/listener 介面。伺服器返回有更新的配置,以 "example.properties+DEFAULT_GROUP"方式返回

3處 – 根據2處返回的列表遍歷請求伺服器 http://ip:port/v1/cs/configs 介面,獲取最新配置。然後更新CacheData content值並更新md5值。

4處 – 把 CacheData 新的md5值跟之前的做比較,如果不一樣就通知監聽者更新值。下一節會跟進去詳解。

5處 – 把該 Runnable 對象重新放入執行緒池,形成一個長輪詢。

本節分析了 Nacos Client 配置是怎麼樣保持跟伺服器接近實時同步的。通過長輪詢+http短連接方式。

刷新值

在開始本節之前,我們先看一下上面多次出現的一個類 CacheData 結構。

public class CacheData {      private final String name;      private final ConfigFilterChainManager configFilterChainManager;      public final String dataId;      public final String group;      public final String tenant;      // 監聽列表      private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;      // 內容md5值      private volatile String md5;      // 是否使用本地配置      private volatile boolean isUseLocalConfig = false;      // 本地版本號      private volatile long localConfigLastModified;      private volatile String content;      // 長輪詢中分段任務ID      private int taskId;      private volatile boolean isInitializing = true;            ...省略程式碼  }

根據名字可以得知, CacheData 是配置數據快取中的對象。listeners 屬性比較有意思,在 BO 中擁有一個監聽列表,當該對象md5改變時會通過遍歷 listeners 通知監聽者們。

前一節從服務端獲取到有更新的配置之後會檢查md5,調用 CacheData.checkListenerMd5()方法:

void checkListenerMd5() {     for (ManagerListenerWrap wrap : listeners) {          if (!md5.equals(wrap.lastCallMd5)) {              safeNotifyListener(dataId, group, content, md5, wrap);          }      }  }
class ManagerListenerWrap {      final Listener listener;      String lastCallMd5 = CacheData.getMd5String(null);          ... 省略程式碼  }

ManagerListenerWrap 的 lastCallMd5 是舊配置的md5值,如果 CacheData 的md5和 ManagerListenerWrap 的lastCallMd5 值不一樣,說明配置有更新。需要通知未更新的監聽者。

private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {      final Listener listener = listenerWrap.listener;      Runnable job = new Runnable() {          @Override          public void run() {              ... 省略程式碼                  // 調用監聽者的方法                  listener.receiveConfigInfo(contentTmp);                  listenerWrap.lastCallMd5 = md5;              ... 省略程式碼          }      };      try {          if (null != listener.getExecutor()) {              listener.getExecutor().execute(job);          } else {              job.run();          }      } catch (Throwable t) {      }  }

調用了監聽者的 receiveConfigInfo() 方法,然後修改 ManagerListenerWrap 的lastCallMd5 值。

本節到這裡分析了從服務端獲取更新配置後通知配置監聽者。但是監聽者是什麼時候註冊的呢?接下來我們繼續分析監聽者註冊到 CacheData 過程。

NacosContextRefresher 實現了ApplicationListener 。在容器準備後會調用 onApplicationEvent() 方法,最終調用 registerNacosListener() 方法。

private void registerNacosListener(final String group, final String dataId) {     Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {       // 通知監聽者調用的就是這個方法啦       @Override        public void receiveConfigInfo(String configInfo) {           refreshCountIncrement();           String md5 = "";           if (!StringUtils.isEmpty(configInfo)) {              try {                 MessageDigest md = MessageDigest.getInstance("MD5");                 md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))).toString(16);              }              catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {                 log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);              }           }           refreshHistory.add(dataId, md5);           // spring的刷新事件通知,刷新監聽者會被執行           applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));        }        @Override        public Executor getExecutor() {           return null;        }     });    // 註冊本監聽者    configService.addListener(dataId, group, listener);    ...省略程式碼  }

通過 NacosConfigService.addListener()註冊監聽者。

NacosConfigService.addListener():

public void addListener(String dataId, String group, Listener listener) throws NacosException {      worker.addTenantListeners(dataId, group, Arrays.asList(listener));  }

還是交給了 ClientWorker

ClientWorker.addTenantListeners()

public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {      group = null2defaultGroup(group);      String tenant = agent.getTenant();      CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);      for (Listener listener : listeners) {          cache.addListener(listener);      }  }

ClientWorker 把監聽者交給了 CacheData 完成了註冊。

匯總系統運行中更新配置的流程:

  1. 啟動時把本地更新 Listener 註冊到 CacheData。
  2. ClientWorker 長輪詢同步服務端的更新配置。
  3. 2中獲取到更新後的配置,重置 CacheData 內容。
  4. CacheData 回調1中註冊上來的 Listener.receiveConfigInfo()
  5. Listener 最終通知spring刷新事件,完成Context刷新屬性值。

總結

Nacos Config Client 和 Nacos Config Server 採用定時長輪詢http請求訪問配置更新,這樣設計 Nacos Config Server 和 Config Client 結構簡單。Server 也沒有長連接模式Client過多的壓力。

我的部落格即將同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=16l9glm94a1q9