Nacos配置中心原理
- 2019 年 12 月 30 日
- 筆記
使用示例
@Before public void init() throws NacosException { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + 8848); properties.put(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT, "20000"); properties.put(PropertyKeyConst.CONFIG_RETRY_TIME, "3000"); properties.put(PropertyKeyConst.MAX_RETRY, "5"); configService = NacosFactory.createConfigService(properties); } @Test public void test() throws InterruptedException, NacosException { configService.addListener("test", "DEFAULT_GROUP", new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { System.out.println(configInfo); } }); TimeUnit.SECONDS.sleep(100); }
NacosConfigService
這是Nacos給客戶端提供的API,可以通過該API:增、刪、蓋、查配置資訊,還可以通過該API給配置添加Listener
創建ConfigService
// NacosFactory#createConfigService public static ConfigService createConfigService(Properties properties) throws NacosException { return ConfigFactory.createConfigService(properties); } // ConfigFactory#createConfigService public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); // 反射創建對象 ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
通過反射的機制創建了一個NacosConfigService
實例,它的構造函數如下
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); } initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager, properties); }
-
MetricsHttpAgent
主要是用來記錄一些Metric資訊,內部持有一個ServerHttpAgent
對象,所以主要邏輯還是關注ServerHttpAgent
; -
ClientWorker
是一個非常核心的類,裡面封裝了客戶端從服務端主動PULL配置資訊的關鍵邏輯,這個在下面重點介紹
ServerHttpAgent
public ServerHttpAgent(Properties properties) throws NacosException { serverListMgr = new ServerListManager(properties); init(properties); }
ServerHttpAgent
內部主要就是封裝了一個HTTP交互的通用方法,比如GET
,PUT
,DELETE
等方法,在ServerHttpAgent
構造函數中,主要做了兩件事情:
- 創建
ServerListManager
對象,這個對象的主要作用就是根據Properties
解析出服務端的地址,然後維護在一個List<String>
中 -
init
方法中主要做了三件事情:初始化編碼格式,如果沒有就默認UTF-8;初始化accessKey
和secretKey
,這個應該是用來驗證客戶端的身份的;初始化重試次數,如果沒傳默認為3
在ServerListManager
對象中,還有一個比較重要的屬性isFixed
,這個屬性的主要作用就是標識伺服器的地址資訊是不是固定的,比如,如果伺服器的地址是通過Properties
傳入,那isFixed
的值為true
為什麼要介紹這個屬性呢,因為在NacosConfigService
的構造函數中,調用了MetricsHttpAgent#start
方法,而這個方法的內部調用鏈如下
MetricsHttpAgent#start => ServerHttpAgent#start => ServerListManager#start
ServerListManager#start
方法主要做了什麼事情呢?如果isFixed
的值為true,就直接返回。否則先執行initServerlistRetryTimes
次GetServerListTask#run
方法獲取,該方法用於更新伺服器地址資訊,如果執行initServerlistRetryTimes
次 之後還是沒有獲取到伺服器地址列表資訊,則直接拋出異常,否則開啟一個定時任務,每30s更新一次伺服器地址列表資訊。
至此,在NacosConfigService
構造函數中,只剩下ClientWorker
相關的邏輯了
ClientWorker
前面已經提到過,ClientWorker
是一個非常核心的類,裡面封裝了客戶端從服務端主動pull
配置資訊的關鍵邏輯。注意這裡很明確指出了ClientWorker
是通過pull模式
從服務端獲取配置資訊的,而我們在使用的時候通常會給它添加Listener
,這 會讓我們以為它是push模式
,這是一點需要注意的地方。
而它實現的pull模式
的關鍵點在於兩個執行緒池,這兩個執行緒池在ClientWorker
的構造函數中初始化。其中有一個執行緒池executor
裡面只有一個執行緒,它的作用就是讓另一個執行緒池啟動。
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
-
init(properties)
方法主要就是初始化一些timeout的參數 -
executor執行緒池
裡面只有一個執行緒,它的唯一作用就是讓另一個執行緒池開始執行,每10s中執行一次 -
executorService執行緒池
用於更新配置資訊,核心任務LongPollingRunnable
ClientWorker#checkConfigInfo
方法主要作用是更新配置資訊,目前已經獲取到的配置資訊會快取到一個Map<String, CacheData>
中,然後對map中的數據分批次,一個批次默認是3000條數據,每個批次的數據對應一個執行緒負責更新,如下:
public void checkConfigInfo() { // 分任務 int listenerSize = cacheMap.get().size(); // 向上取整為批數 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
從LongPollingRunnable
的名字可以看出,客戶端主要是通過長輪詢的方式去更新配置資訊
LongPollingRunnable
public void run() { // 本批次號的數據 List<CacheData> cacheDatas = new ArrayList<CacheData>(); // 本批次號中 isInitializing=true 的數據,CacheData首次出現在Map中並且是首次check更新時,isInitializing的值才為true List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 涉及到FailoverFile checkLocalConfig(cacheData); // 如果有更新,需要更新Listener的MD5值,並執行Listener邏輯 if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config // 找到isUseLocalConfig=false的數據,然後將每個符合條件的CacheData的 `group + dataId + tenant` 拼成一個字元串傳給服務端校驗,然後服務端會返回一個需要更新的`List<String>`,該列表裡面的每個元素代表一個CacheData的key List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { // 根據需要更新的key列表,從服務端獲取配置資訊 String content = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); } catch (NacosException ioe) { } } // 這部分程式碼是在沒看懂 for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); // 重新提交自己 executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
LongPollingRunnable#run
方法比較長:
- 從map中取出批次號符與該執行緒對應批次號相同的數據,因為每條配置資訊
CacheData
都維護了一個批次號資訊,然後每個LongPollingRunnable
也對應一個批次號資訊,只需要負責更新批次號相同的數據
,不同的由別的LongPollingRunnable
去更新,這部分據放在變數cacheDatas
中 - 如果客戶端和服務端部署在同一個節點,此時客戶端可以直接從本地文件中獲取配置資訊,避免遠程交互,設置這部分數據的
isUseLocalConfig=true
,並更新CacheData
的值,同時更新CacheData
的MD5值 - 如果步驟2可以直接根據本地文件更新值
isUseLocalConfig==true
,此時執行cacheData#checkListenerMd5
方法,該方法會拿CacheData
的MD5值和Listener
的MD5值對比,如果不一樣就更新Listener
的MD5值並回調Listener
的相關方法 - 排除
isUseLocalConfig=true
的配置資訊,然後將每個符合條件的配置資訊的group + dataId +tenant
拼成一個字元串傳給服務端校驗,然後服務端會返回一個需要更新的List<String>
,該列表裡面的每個元素代表一個CacheData
的key - 根據上一步驟返回的key列表,從服務端拉取配置資訊,然後更新
CacheData
的值 -
cacheData.isInitializing()
代表該條配置資訊首次出現在map中並且是首次更新,inInitializingCacheList
代表本批次中isUseLocalConfig==false && isInitializing == true
- 最後那個遍歷不太清楚啥意思,
isInitializing == false || isInitializing == true
?
配置監聽器
上面提到ClientWorker
中有一個Map,那裡面的數據從哪裡來呢?為什麼要存這部分數據呢?答案是在為CacheData
添加Listener
的時候,會初始化一條CacheData
數據,並添加到Map中
添加監聽
ConfigService configService = NacosFactory.createConfigService(properties); configService.addListener("test", "DEFAULT_GROUP", new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { System.out.println(configInfo); } });
源碼
// NacosConfigService#addListener public void addListener(String dataId, String group, Listener listener) throws NacosException { worker.addTenantListeners(dataId, group, Arrays.asList(listener)); } // ClientWorker#addTenantListeners public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException { group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); for (Listener listener : listeners) { cache.addListener(listener); } }
內部還是委託給ClientWorker
來實現,添加監時主要做了以下幾件事情
- 將
dataId group tenant
封裝成key,從ClientWorker
的Map快取中獲取CacheData
1.1. 如果從快取中拿到了數據,將CacheData
的isInitializing
屬性設置為true 1.2. 如果從快取中沒拿到數據,則創建一個CacheData
,同時判斷是否開啟遠程同步,即enableRemoteSyncConfig
屬性的值是否為true。如果開啟了,則從服務端獲取該配置的值,並設置到CacheData
中 - 將最新的
CacheData
設置到快取中 - 將
Listener
添加到CacheData
的CopyOnWriteArrayList<ManagerListenerWrap> listeners
列表中
觸發監聽
其實在上面已經提到過,當配置有更新時,會觸發Listener
的回調邏輯,這部分邏輯在CacheData#checkListenerMd5
方法中
void checkListenerMd5() { // `ManagerListenerWrap`只是一個包裝類,裡面維護了`Listener`和對應`CacheData`的MD5值 for (ManagerListenerWrap wrap : listeners) { // 如果不一樣,說明配置有變更,此時更新ManagerListenerWrap的MD5值,並執行Listener的回調 if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, md5, wrap); } } } private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { try { ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listener.receiveConfigInfo(contentTmp); listenerWrap.lastCallMd5 = md5; } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { // 如果配置了執行緒池,交給執行緒池執行 listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { } }
-
ManagerListenerWrap
只是一個包裝類,裡面維護了Listener
和對應CacheData
的MD5值 - 判斷
ManagerListenerWrap
和當前CacheData
的MD5值是否相同,如果不一樣,說明配置有變更,此時需要更新ManagerListenerWrap
的MD5值,並執行Listener
的回調 - 更新
ManagerListenerWrap
的MD5值和執行Listener
的回調的邏輯都在safeNotifyListener
方法中,同時會判斷是否為Listener
配置了執行緒池如,沒有就直接執行,有就交給執行緒池執行
創建配置
ConfigService configService = NacosFactory.createConfigService(properties); configService.publishConfig("one", "LSZ", "大美女");
源碼
private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName, String betaIps, String content) throws NacosException { ...... 一大坨構造參數的程式碼 ...... HttpResult result = null; try { // 推送到服務端 result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT); } catch (IOException ioe) { return false; } ...... }
獲取配置
ConfigService configService = NacosFactory.createConfigService(properties); configService.getConfig("testlsz", "lszgroup", 1000000);
源碼
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { ...... // 優先使用本地配置 String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); if (content != null) { return content; } try { // 本地沒有從伺服器獲取 content = worker.getServerConfig(dataId, group, tenant, timeoutMs); cr.setContent(content); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } catch (NacosException ioe) { } ...... }
刪除配置
ConfigService configService = NacosFactory.createConfigService(properties); configService.removeConfig("testlsz", "lszgroup");
源碼
private boolean removeConfigInner(String tenant, String dataId, String group, String tag) throws NacosException { ...... 一大坨構造參數的程式碼 ...... HttpResult result = null; try { // 直接發送http請求 result = agent.httpDelete(url, null, params, encode, POST_TIMEOUT); } catch (IOException ioe) { LOGGER.warn("[remove] error, " + dataId + ", " + group + ", " + tenant + ", msg: " + ioe.toString()); return false; } ...... }
本地文件
在Naco中涉及到兩個文件,FailoverFile
和SnapshotFile
-
FailoverFile
為容災文件,當本地和資料庫裡面數據不一致的時候會去使用,一般不會用; -
SnapshotFile
為配置的快照,當獲取不到伺服器上的配置的時候,會讀取本地快照;FailoverFile
在客戶端不會自動生成,它是在服務端生成的,當更新了一條配置之後,就會反應到這個文件中。所以如果想在客戶端使用到這個功能,需要手工將文件添加到客戶端,然後客戶端就不會去讀取服務端的配置了,也許某些場景下可以用到
SnapshotFile
SnapshotFile
文件位置:userhomenacosconfigfixed-127.0.0.1_8848_nacossnapshot{group}{dataId}
當客戶端從服務端獲取配置之後,會將該資訊寫入快照文件中,核心程式碼就在ClientWorker#getServerConfig
中
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { HttpResult result = null; try { // 從服務端獲取配置資訊 result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: // 將配置更新到本地文件 LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: // 根據 dataId、group、tenant 將本地的文件刪除 LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { throw new NacosException(NacosException.CONFLICT,"data being modified, dataId=" + dataId + ", group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
FailoverFile
FailoverFile
文件位置:userhomenacosconfigfixed-127.0.0.1_8848_nacosdataconfig-data{group}{dataId}
對FailoverFile
文件的判斷主要是在ClientWorker#checkLocalConfig
方法,看了幾篇文章,都說這個方法的校驗是為了在服務端掛了的時候,可以直接從客戶端獲取文件,這明顯是不對的。剛剛在上面也提到過,FailoverFile
的主要作用是容災,而且這個文件在客戶端不會自動生成,想要使用這個功能必須手動添加
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; // 注意這個,容災文件,這個文件在客戶端是不會自動生成的 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 沒有 -> 有 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到配置後通知。 if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 /** * isUseLocalConfig=true && && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified() * 說明配置有更新,所以此時需要更新 CacheData */ if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
有關於獲取配置的優化
從上面已經知道,有一個執行緒池會不停的執行LongPollingRunnable#run
方法,這個方法主要作用就是從服務端拉取最新的配置資訊,如果直接按照正常的做法來做,直接根據dataId group tenant
去拉取就好了,如果每次都直接去伺服器來配置資訊,但這樣會有一些性能問題:
- 配置資訊變動的可能性很小,如果每次都需要全量去拉取,拉取的資訊基本都是一樣的,這很浪費資源;
- 如果從服務端拉取數據的頻率太高,會太耗性能;如果拉取的頻率太低,數據發生變更之後客戶端響應不及時;
針對上面幾個問題,Nacos做了以下幾個優化
- 只拉取改動過的配置資訊:客戶端先通過一個HTTP請求發送一個
key列表
給服務端,服務端返回發生了變更的Key列表
,大部分時候,這可以過濾掉絕大部分沒有配置項; - 通過HTTP長輪詢減少少客戶端和服務端的交互頻率,但這必然要面對一個數據響應不實時問問題,怎麼解決?
配置實時更新
先推薦一篇文章:Nacos配置實時更新原理分析 這篇文章已經寫的非常詳細了,不過那篇文章有點長,這裡總結一下,為了自己以後看的時候方便。
通過HTTP長輪詢較少客戶端和服務端的交互頻率,但這必然要面對一個數據響應不實時問問題,怎麼解決?
在客戶端向服務端拉取配置資訊之前,需要先向服務端發送一個配置Key列表
,然後服務端返回一個發生了變更的配置Key列表
ClientWorker#checkUpdateConfigStr
// timeout默認30s超時 HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
請求的服務端地址: /v1/cs/configs/listener
ConfigController#listener
@RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { .... // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
// ConfigServletInner#doPollingConfig ,暫且只關注長輪詢
// 長輪詢 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }
LongPollingService#addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返迴響應,為避免客戶端超時 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP執行緒調用,否則離開後容器會立即發送響應, 開啟Servlet非同步支援 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超時時間不準,所以只能自己控制 asyncContext.setTimeout(0L); // 向執行緒池提交了一個任務,所以核心邏輯在 ClientLongPolling 中, scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
ClientLongPolling#run
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 刪除訂閱關係 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { // 根據客戶端傳過來的key列表,找到該發生了變更的配置的key列表 List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { // 返回給客戶端 sendResponse(changedGroups); } else { // 沒有就返回空 sendResponse(null); } } else { sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
- 在run方法裡面又創建了一個任務,延遲 29.5s 後執行
- 將
ClientLongPolling
添加到allSubs
中 - 延遲時間到了之後,執行任務,先將
ClientLongPolling
從allSubs
中移除,然後通過AsyncContext
將結果寫回客戶端
allSubs
數據結構如下,可以把allSubs
當作是一個訂閱者列表,當配置發生變成的時候,會發布一個事件,然後這些訂閱者會得到相應,然後再執行相應的功能
Queue<ClientLongPolling> allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
修改配置
延遲 29.5s 執行,要是在這段時間內,數據發生了變更怎麼辦,難道要客戶端 29.5s 之後才知道嗎?肯定不是的。可以看看當數據發生變更時,會涉及到什麼介面
ConfigController#publishConfig
請求的服務端地址: /v1/cs/configs
// 持久化 persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); // 觸發事件 EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
EventDispatcher#fireEvent
static public void fireEvent(Event event) { for (AbstractEventListener listener : getEntry(event.getClass()).listeners) { try { listener.onEvent(event); } catch (Exception e) { log.error(e.toString(), e); } } } static public abstract class AbstractEventListener { public AbstractEventListener() { EventDispatcher.addEventListener(this); } } // EventDispatcher#addEventListener static public void addEventListener(AbstractEventListener listener) { for (Class<? extends Event> type : listener.interest()) { getEntry(type).listeners.addIfAbsent(listener); } }
注意觀察AbstractEventListener
的構造函數,在其構造函數中涉及到Listener
的註冊過程,而AbstractEventListener
有以下幾個子類:
AsyncNotifyService -> ConfigDataChangeEvent LongPollingService -> LocalDataChangeEvent MockListener 計數用的
剛剛看上面發布的是一個ConfigDataChangeEvent
事件,所以會先執行AsyncNotifyService#onEvent
方法。該方法中會先獲取服務端所有IP列表,依次通過Http通知對象/v1/cs/communication/dataChange?dataId=xx&group=xx,接收到請求後, 會dump出來所有config資訊,同時回調LocalDataChangeEvent事件,然後執行LongPollingService#onEvent
方法。
LongPollingService#onEvent
public void onEvent(Event event) { if (isFixedPolling()) { } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
提交了一個任務,關鍵邏輯在DataChangeTask#run
方法中
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { // ClientLongPolling 作為訂閱者,配置類似於Topic,更新配置就是發布了一個事件 ClientLongPolling clientSub = iter.next(); // 找到訂閱了 當前發生了變更配置項 的ClientLongPolling if (clientSub.clientMd5Map.containsKey(groupKey)) { getRetainIps().put(clientSub.ip, System.currentTimeMillis()); // 刪除訂閱關係 iter.remove(); // 向客戶端回寫數據 clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } }
- 找到訂閱了 當前發生了變更配置項 的ClientLongPolling
- 向客戶端回寫數據
這裡會有一個問題,如果DataChangeTask
任務完成了向客戶端寫數據,此時ClientLongPolling
中的調度任務又開始執行了怎麼辦呢?這時任務都被移除了,肯定會報錯啊
很簡單,只要在進行"推送"操作之前,先將原來等待執行的調度任務取消掉就可以了,如下:
void sendResponse(List<String> changedGroups) { /** * 取消超時任務 */ if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); } generateResponse(changedGroups); }
這樣,就達到了一個類似於數據"推送"的效果,如果一直沒有更新,客戶端等待時間接近 30s,如果在等待期間有數據發生變更,幾乎可以實時的返回給客戶端