如何把 Caffeine Cache 用得如絲般順滑?
一、關於 Caffeine Cache
在推薦服務中,雖然允許少量請求因計算超時等原因返回默認列表。但從運營指標來說,越高的「完算率」意味著越完整的演算法效果呈現,也意味著越高的商業收益。(完算率類比影片的完播率,成功完成整個推薦線上流程計算的請求次數/總請求次數)
為了能夠儘可能快地完成計算,多級快取方案已經成為推薦線上服務的標配。其中本地快取顯得尤為重要,而 Caffeine Cache 就是近幾年脫穎而出的高性能本地快取庫。Caffeine Cache 已經在 Spring Boot 2.0 中取代了 Google Guava 成為默認快取框架,足見其成熟和可靠。
關於 Caffeine 的介紹文章有很多,不再累述,可閱讀文末的參考資料了解 Caffeine 的簡述、性能基準測試結果、基本 API 用法和 Window-TinyLFU 快取演算法原理等。雖然接觸 Caffeine 的時間不長,但其簡潔的 API 和如絲般順滑的非同步載入能力簡直不要太好用。而本菜鳥在使用的過程中也踩了一些坑,使用不當甚至快取也能卡得和磁碟 IO 一樣慢。
經過一番學習嘗試,總算了解到 Caffeine Cache 如絲般順滑的奧秘,總結下來分享一下。
二、Caffeine Cache 配置套路
使用 Caffeine Cache,除了 Spring 中常見的 @EnableCache、@Cacheable 等註解外,直接使用 Caffeine.newBuilder().build() 方法創建 LoadingCache 也是推薦服務常用的方式。
我們先來看看 Caffeine#builder 都有哪些配置套路:
2.1 追問三連
2.1.1 ObjectPool
當然可以,光腳的不怕穿鞋的,上線後別走……
2.1.2 expireAfterWrite、expireAfterAccess 都配置?
雖然 expireAfterWrite 和 expireAfterAccess 同時配置不報錯,但 access 包含了 write,所以選一個就好了親。
2.1.3 reference-based 驅逐有啥特點?
只要配置上都會使用 == 來比較對象相等,而不是 equals;還有一個非常重要的配置,也是決定快取如絲般順滑的秘訣:刷新策略 refreshAfterWrite。該配置使得 Caffeine 可以在數據載入後超過給定時間時刷新數據。下文詳解。
機智如我在 Builder 上也能踩坑
和 lombok 的 builder 不同,Caffeine#builder 的策略調用兩次將會導致運行時異常!這是因為 Caffeine 構建時每個策略都保存了已設置的標記位,所以重複設置並不是覆蓋而是直接拋異常:
public Caffeine<K, V> maximumWeight(@NonNegative long maximumWeight) {
requireState(this.maximumWeight == UNSET_INT,
"maximum weight was already set to %s", this.maximumWeight);
requireState(this.maximumSize == UNSET_INT,
"maximum size was already set to %s", this.maximumSize);
this.maximumWeight = maximumWeight;
requireArgument(maximumWeight >= 0, "maximum weight must not be negative");
return this;
}
比如上述程式碼,maximumWeight() 調用兩次的話就會拋出異常並提示 maximum weight was already set to xxx。
三、Caffeine Cache 精華
3.1 get 方法都做了什麼?
首先在實現類 LocalLoadingCache<K, V> 中可以看到;
default @Nullable V get(K key) {
return cache().computeIfAbsent(key, mappingFunction());
}
但突然發現這個 get 方法沒有實現類!Why?我們跟蹤 cache() 方法就可以發現端倪:
public BoundedLocalCache<K, V> cache() {
return cache;
}
public UnboundedLocalCache<K, V> cache() {
return cache;
}
根據調用 Caffeine.newBuilder().build() 的過程,決定了具體生成的是 BoundedLocalCache 還是 UnboundedLocalCache;
判定 BoundedLocalCache 的條件如下:
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
@NonNull CacheLoader<? super K1, V1> loader) {
requireWeightWithWeigher();
@SuppressWarnings("unchecked")
Caffeine<K1, V1> self = (Caffeine<K1, V1>) this;
return isBounded() || refreshes()
? new BoundedLocalCache.BoundedLocalLoadingCache<>(self, loader)
: new UnboundedLocalCache.UnboundedLocalLoadingCache<>(self, loader);
}
其中的 isBounded()、refreshes() 方法分別如下:
boolean isBounded() {
return (maximumSize != UNSET_INT)
|| (maximumWeight != UNSET_INT)
|| (expireAfterAccessNanos != UNSET_INT)
|| (expireAfterWriteNanos != UNSET_INT)
|| (expiry != null)
|| (keyStrength != null)
|| (valueStrength != null);
}
boolean refreshes() {
// 調用了 refreshAfter 就會返回 false
return refreshNanos != UNSET_INT;
}
可以看到一般情況下常規的配置都是 BoundedLocalCache。所以我們以它為例繼續看 BoundedLocalCache#computeIfAbsent 方法吧:
public @Nullable V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction,
boolean recordStats, boolean recordLoad) {
// 常用的 LoadingCache#get 方法 recordStats、recordLoad 都為 true
// mappingFunction 即 builder 中傳入的 CacheLoader 實例包裝
requireNonNull(key);
requireNonNull(mappingFunction);
// 默認的 ticker read 返回的是 System.nanoTime();
// 關於其他的 ticker 見文末參考文獻,可以讓使用者自定義超時的計時方式
long now = expirationTicker().read();
// data 是 ConcurrentHashMap<Object, Node<K, V>>
// key 根據程式碼目前都是 LookupKeyReference 對象
// 可以發現 LookupKeyReference 保存的是 System.identityHashCode(key) 結果
// 關於 identityHashCode 和 hashCode 的區別可閱讀文末參考資料
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node != null) {
V value = node.getValue();
if ((value != null) && !hasExpired(node, now)) {
// isComputingAsync 中將會判斷當前是否為非同步類的快取實例
// 是的話再判斷 node.getValue 是否完成。BoundedLocaCache 總是返回 false
if (!isComputingAsync(node)) {
// 此處在 BoundedLocaCache 中也是直接 return 不會執行
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}
// 非同步驅逐任務提交、非同步刷新操作
// CacheLoader#asyncReload 就在其中的 refreshIfNeeded 方法被調用
afterRead(node, now, recordStats);
return value;
}
}
if (recordStats) {
// 記錄快取的載入成功、失敗等統計資訊
mappingFunction = statsAware(mappingFunction, recordLoad);
}
// 這裡2.8.0版本不同實現類生成的都是 WeakKeyReference
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
// 本地快取沒有,使用載入函數讀取到快取
return doComputeIfAbsent(key, keyRef, mappingFunction,
new long[] { now }, recordStats);
}
上文中 hasExpired 判斷數據是否過期,看程式碼就很明白了:是通過 builder 的配置 + 時間計算來判斷的。
boolean hasExpired(Node<K, V> node, long now) {
return
(expiresAfterAccess() &&
(now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() &&
(now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() &&
(now - node.getVariableTime() >= 0));
}
繼續看程式碼,doComputeIfAbsent 方法主要內容如下:
@Nullable V doComputeIfAbsent(K key, Object keyRef,
Function<? super K, ? extends V> mappingFunction,
long[] now, boolean recordStats) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
V[] newValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
K[] nodeKey = (K[]) new Object[1];
@SuppressWarnings({"unchecked", "rawtypes"})
Node<K, V>[] removed = new Node[1];
int[] weight = new int[2]; // old, new
RemovalCause[] cause = new RemovalCause[1];
// 對 data 這個 ConcurrentHashMap 調用 compute 方法,計算 key 對應的值
// compute 方法的執行是原子的,並且會對 key 加鎖
// JDK 注釋說明 compute 應該短而快並且不要在其中更新其他的 key-value
Node<K, V> node = data.compute(keyRef, (k, n) -> {
if (n == null) {
// 沒有值的時候調用 builder 傳入的 CacheLoader#load 方法
// mappingFunction 是在 LocalLoadingCache#newMappingFunction 中創建的
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
return null;
}
now[0] = expirationTicker().read();
// builder 沒有指定 weigher 時,這裡默認為 SingletonWeigher,總是返回 1
weight[1] = weigher.weigh(key, newValue[0]);
n = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
return n;
}
// 有值的時候對 node 實例加同步塊
synchronized (n) {
nodeKey[0] = n.getKey();
weight[0] = n.getWeight();
oldValue[0] = n.getValue();
// 設置驅逐原因,如果數據有效直接返回
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
return n;
}
// 默認的配置 writer 是 CacheWriter.disabledWriter(),無操作;
// 自己定義的 CacheWriter 一般用於驅逐數據時得到回調進行外部數據源操作
// 詳情可以參考文末的資料
writer.delete(nodeKey[0], oldValue[0], cause[0]);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
removed[0] = n;
n.retire();
return null;
}
weight[1] = weigher.weigh(key, newValue[0]);
n.setValue(newValue[0], valueReferenceQueue());
n.setWeight(weight[1]);
now[0] = expirationTicker().read();
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
return n;
}
});
// 剩下的程式碼主要是調用 afterWrite、notifyRemoval 等方法
// 進行後置操作,後置操作中將會再次嘗試快取驅逐
// ...
return newValue[0];
}
看完上面的程式碼,遇到這些問題也就心裡有數了。
3.2 快取的數據什麼時候淘汰?
顯式調用 invalid 方法時;弱引用、軟引用可回收時;get 方法老值存在且已完成非同步載入後調用 afterRead。
get 方法老值不存在,調用 doComputeIfAbsent 載入完數據後調用 afterWrite。
3.3 CacheLoader#load和 CacheLoader#asyncReload 有什麼區別?
首先 CacheLoader#load 方法是必須提供的,快取調用時將是同步操作(回顧上文 data.compute 方法),會阻塞當前執行緒。
而 CacheLoader#asyncReload 需要配合builder#refreshAfterWrite 使用這樣將在computeIfAbsent->afterRead->refreshIfNeeded 中調用,並非同步更新到 data 對象上;並且,load 方法沒有傳入oldValue,而 asyncReload 方法提供了oldValue,這意味著如果觸發 load 操作時,快取是不能保證 oldValue 是否存在的(可能是首次,也可能是已失效)。
3.4 載入數據耗時較長,對性能的影響是什麼?
CacheLoader#load 耗時長,將會導致快取運行過程中查詢數據時阻塞等待載入,當多個執行緒同時查詢同一個 key 時,業務請求可能阻塞,甚至超時失敗;
CacheLoader#asyncReload 耗時長,在時間周期滿足的情況下,即使耗時長,對業務的影響也較小
3.5 說好的如絲般順滑呢?
首要前提是外部數據查詢能保證單次查詢的性能(一次查詢天長地久那加本地快取也於事無補);然後,我們在構建 LoadingCache 時,配置 refreshAfterWrite 並在 CacheLoader 實例上定義 asyncReload 方法;
靈魂追問:只有以上兩步就夠了嗎?
機智的我突然覺得事情並不簡單。還有一個時間設置的問題,我們來看看:
如果 expireAfterWrite 周期 < refreshAfterWrite 周期會如何?此時查詢失效數據時總是會調用 load 方法,refreshAfterWrite 根本沒用!
如果 CacheLoader#asyncReload 有額外操作,導致它自身實際執行查詢耗時超過 expireAfterWrite 又會如何?還是 CacheLoader#load 生效,refreshAfterWrite 還是沒用!
所以絲滑的正確打開方式,是 refreshAfterWrite 周期明顯小於 expireAfterWrite 周期,並且 CacheLoader#asyncReload 本身也有較好的性能,才能如絲般順滑地載入數據。此時就會發現業務不斷進行 get 操作,根本感知不到數據載入時的卡頓!
3.6 用本地快取會不會出現快取穿透?怎麼防止?
computeIfAbsent 和 doComputeIfAbsent 方法可以看出如果載入結果是 null,那麼每次從快取查詢,都會觸發 mappingFunction.apply,進一步調用 CacheLoader#load。從而流量會直接打到後端資料庫,造成快取穿透。
防止的方法也比較簡單,在業務可接受的情況下,如果未能查詢到結果,則返回一個非 null 的「假對象」到本地快取中。
靈魂追問:如果查不到,new 一個對象返回行不行?
key 範圍不大時可以,builder 設置了 size-based 驅逐策略時可以,但都存在消耗較多記憶體的風險,可以定義一個默認的 PLACE_HOLDER 靜態對象作為引用。
靈魂追問:都用同一個假對象引用真的大丈夫(沒問題)?
這麼大的坑本菜鳥怎麼能錯過!快取中存的是對象引用,如果業務 get 後修改了對象的內容,那麼其他執行緒再次獲取到這個對象時,將會得到修改後的值!鬼知道那個深夜定位出這個問題的我有多興奮(蒼蠅搓手)。
當時快取中保存的是 List
靈魂追問:那怎麼解決快取被意外修改的問題呢?怎麼 copy 一個對象呢?
So easy,就在 get 的時候 copy 一下對象就好了。
靈魂追問4:怎麼 copy 一個對象?……停!咱們以後有機會再來說說這個淺拷貝和深拷貝,以及常見的拷貝工具吧,聚焦聚焦……
3.7 某次載入數據失敗怎麼辦,還能用之前的快取值嗎?
根據 CacheLoader#load和 CacheLoader#asyncReload 的參數區別,我們可以發現:
應該在 asyncReload 中來處理,如果查詢資料庫異常,則可以返回 oldValue 來繼續使用之前的快取;否則只能通過 load 方法中返回預留空對象來解決。使用哪一種方法需要根據具體的業務場景來決定。
【踩坑】返回 null 將導致 Caffeine 認為該值不需要快取,下次查詢還會繼續調用 load 方法,快取並沒生效。
3.8 多個執行緒同時 get 一個本地快取不存在的值,會如何?
根據程式碼可以知道,已經進入 doComputeIfAbsent 的執行緒將阻塞在 data.compute 方法上;
比如短時間內有 N 個執行緒同時 get 相同的 key 並且 key 不存在,則這 N 個執行緒最終都會反覆執行 compute 方法。但只要 data 中該 key 的值更新成功,其他進入 computeIfAbsent 的執行緒都可直接獲得結果返回,不會出現阻塞等待載入;
所以,如果一開始就有大量請求進入 doComputeIfAbsent 阻塞等待數據,就會造成短時間請求掛起、超時的問題。由此在大流量場景下升級服務時,需要考慮在接入流量前對快取進行預熱(我查我自己,嗯),防止瞬時請求太多導致大量請求掛起或超時。
靈魂追問:如果一次 load 耗時 100ms,一開始有 10 個執行緒冷啟動,最終等待時間會是 1s 左右嗎?
其實……要看情況,回顧一下 data.compute 裡面的程式碼:
if (n == null) {
// 這部分程式碼其他後續執行緒進入後已經有值,不再執行
}
synchronized (n) {
// ...
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
// 未失效時在這裡返回,不會觸發 load 函數
return n;
}
// ...
}
所以,如果 load 結果不是 null,那麼只第一個執行緒花了 100ms,後續執行緒會儘快返回,最終時長應該只比 100ms 多一點。但如果 load 結果返回 null(快取穿透),相當於沒有查到數據,於是後續執行緒還會再次執行 load,最終時間就是 1s 左右。
以上就是本菜鳥目前總結的內容,如有疏漏歡迎指出。在學習源碼的過程中,Caffeine Cache 還使用了其他編碼小技巧,咱們下次有空接著聊。
三、參考資料
2.Caffeine Cache-高性能Java本地快取組件
6.System.identityHashCode(obj)與obj.hashcode
作者:vivo 互聯網伺服器團隊-Li Haoxuan