Spring Cloud Eureka源碼分析之三級快取的設計原理及源碼分析
- 2021 年 12 月 16 日
- 筆記
- Eureka三級快取, Eureka源碼
Eureka Server 為了提供響應效率,提供了兩層的快取結構,將 Eureka Client 所需要的註冊資訊,直接存儲在快取結構中,實現原理如下圖所示。
第一層快取:readOnlyCacheMap,本質上是 ConcurrentHashMap,依賴定時從 readWriteCacheMap 同步數據,默認時間為 30 秒。
readOnlyCacheMap : 是一個 CurrentHashMap 只讀快取,這個主要是為了供客戶端獲取註冊資訊時使用,其快取更新,依賴於定時器的更新,通過和 readWriteCacheMap 的值做對比,如果數據不一致,則以 readWriteCacheMap 的數據為準。
第二層快取:readWriteCacheMap,本質上是 Guava 快取。
readWriteCacheMap:readWriteCacheMap 的數據主要同步於存儲層。當獲取快取時判斷快取中是否沒有數據,如果不存在此數據,則通過 CacheLoader 的 load 方法去載入,載入成功之後將數據放入快取,同時返回數據。
readWriteCacheMap 快取過期時間,默認為 180 秒,當服務下線、過期、註冊、狀態變更,都會來清除此快取中的數據。
Eureka Client 獲取全量或者增量的數據時,會先從一級快取中獲取;如果一級快取中不存在,再從二級快取中獲取;如果二級快取也不存在,這時候先將存儲層的數據同步到快取中,再從快取中獲取。
通過 Eureka Server 的二層快取機制,可以非常有效地提升 Eureka Server 的響應時間,通過數據存儲層和快取層的數據切割,根據使用場景來提供不同的數據支援。
多級快取的意義
這裡為什麼要設計多級快取呢?原因很簡單,就是當存在大規模的服務註冊和更新時,如果只是修改一個ConcurrentHashMap數據,那麼勢必因為鎖的存在導致競爭,影響性能。
而Eureka又是AP模型,只需要滿足最終可用就行。所以它在這裡用到多級快取來實現讀寫分離。註冊方法寫的時候直接寫記憶體註冊表,寫完表之後主動失效讀寫快取。
獲取註冊資訊介面先從只讀快取取,只讀快取沒有再去讀寫快取取,讀寫快取沒有再去記憶體註冊表裡取(不只是取,此處較複雜)。並且,讀寫快取會更新回寫只讀快取
- responseCacheUpdateIntervalMs : readOnlyCacheMap 快取更新的定時器時間間隔,默認為30秒
- responseCacheAutoExpirationInSeconds : readWriteCacheMap 快取過期時間,默認為 180 秒 。
快取初始化
readWriteCacheMap使用的是LoadingCache對象,它是guava中提供的用來實現記憶體快取的一個api。創建方式如下
LoadingCache<Long, String> cache = CacheBuilder.newBuilder()
//快取池大小,在快取項接近該大小時, Guava開始回收舊的快取項
.maximumSize(10000)
//設置時間對象沒有被讀/寫訪問則對象從記憶體中刪除(在另外的執行緒裡面不定期維護)
.expireAfterAccess(10, TimeUnit.MINUTES)
//移除監聽器,快取項被移除時會觸發
.removalListener(new RemovalListener <Long, String>() {
@Override
public void onRemoval(RemovalNotification<Long, String> rn) {
//執行邏輯操作
}
})
.recordStats()//開啟Guava Cache的統計功能
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) {
//從 SQL或者NoSql 獲取對象
}
});//CacheLoader類 實現自動載入
其中,CacheLoader是用來實現快取自動載入的功能,當觸發readWriteCacheMap.get(key)
方法時,就會回調CacheLoader.load
方法,根據key去服務註冊資訊中去查找實例數據進行快取
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key); //注意這裡
return value;
}
});
而快取的載入,是基於generatePayload
方法完成的,程式碼如下。
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
此方法接受一個 Key
類型的參數,返回一個 Value
類型。 其中 Key
中重要的欄位有:
KeyType
,表示payload文本格式,有 JSON 和 XML 兩種值。EntityType
,表示快取的類型,有Application
,VIP
,SVIP
三種值。entityName
,表示快取的名稱,可能是單個應用名,也可能是ALL_APPS
或ALL_APPS_DELTA
。
Value
則有一個 String
類型的payload和一個 byte
數組,表示gzip壓縮後的位元組。
快取同步
在ResponseCacheImpl
這個類的構造實現中,初始化了一個定時任務,這個定時任務每個
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
//省略...
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
}
默認每30s從readWriteCacheMap更新有差異的數據同步到readOnlyCacheMap中
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) { //遍歷只讀集合
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) { //判斷差異資訊,如果有差異,則更新
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
快取失效
在AbstractInstanceRegistry.register這個方法中,當完成服務資訊保存後,會調用invalidateCache
失效快取
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
//....
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
//....
}
最終調用ResponseCacheImpl.invalidate方法,完成快取的失效機制
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
版權聲明:本部落格所有文章除特別聲明外,均採用 CC BY-NC-SA 4.0 許可協議。轉載請註明來自
Mic帶你學架構
!
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟著Mic學架構」公眾號公眾號獲取更多技術乾貨!