構建高效且可伸縮的結果快取
- 2022 年 2 月 22 日
- 筆記
你好呀,我是歪歪。
我來填坑來啦。

上周發布了《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章。
文章的最後,我提到了《Java並發編程實戰》的第 5.6 節的內容,說大家可以去看看。
我不知道有幾個同學去看了,但是我知道絕大部分同學都沒去看的,所以這篇文章我也給大家安排一下,怎麼去比較好的實現一個快取功能。
感受一下大師的程式碼方案演進的過程。
需求
這不都二月中旬了嘛,馬上就要出考研成績了,我就拿這個來舉個例子吧。
需求很簡單:從快取中查詢,查不到則從資料庫獲取,並放到快取中去,供下次使用。
核心程式碼大概就是這樣的:
Integer score = map.get("why");
if(score == null){
score = loadFormDB("why");
map.put("why",score);
}
有了核心程式碼,所以我把程式碼補全之後應該是這樣的:
public class ScoreQueryService {
private final Map<String, Integer> SCORE_CACHE = new HashMap<>();
public Integer query(String userName) throws InterruptedException {
Integer result = SCORE_CACHE.get(userName);
if (result == null) {
result = loadFormDB(userName);
SCORE_CACHE.put(userName, result);
}
return result;
}
private Integer loadFormDB(String userName) throws InterruptedException {
System.out.println("開始查詢userName=" + userName + "的分數");
//模擬耗時
TimeUnit.SECONDS.sleep(1);
return ThreadLocalRandom.current().nextInt(380, 420);
}
}
然後搞一個 main 方法測試一下:
public class MainTest {
public static void main(String[] args) throws InterruptedException {
ScoreQueryService scoreQueryService = new ScoreQueryService();
Integer whyScore = scoreQueryService.query("why");
System.out.println("whyScore = " + whyScore);
whyScore = scoreQueryService.query("why");
System.out.println("whyScore = " + whyScore);
}
}
把程式碼兒跑起來:

好傢夥,第一把就跑了個 408 分,我考研要是真能考到這個分數,怕是做夢都得笑醒。

Demo 很簡單,但是請你注意,我要開始變形了。
首先把 main 方法修改為這樣:
public class MainTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
ScoreQueryService scoreQueryService = new ScoreQueryService();
for (int i = 0; i < 3; i++) {
executorService.execute(()->{
try {
Integer why = scoreQueryService.query("why");
System.out.println("why = " + why);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
利用執行緒池提交任務,模擬同一時間發起三次查詢請求,由於 loadFormDB 方法裡面有模擬耗時的操作,那麼這三個請求都不會從快取中獲取到數據。
具體是不是這樣的呢?
看一下運行結果:

輸出三次,得到了三個不同的分數,說明確實執行了三次 loadFormDB 方法。
好,同學們,那麼問題就來了。
很明顯,在這個場景下,我只想要一個執行緒執行 loadFormDB 方法就行了,那麼應該怎麼操作呢?
看到這個問題的瞬間,不知道你的腦袋裡面有沒有電光火石般的想起快取問題三連擊:快取雪崩、快取擊穿、快取穿透。
畢竟應對快取擊穿的解決方案之一就是只需要一個請求執行緒去做構建快取,其他的執行緒就輪詢等著。
然後腦海裡面自然而然的就浮現出了 Redis 分散式鎖的解決方案,甚至還想到了應該用 setNX 命令來保證只有一個執行緒放行成功。嘴角漏出一絲不易察覺的笑容,甚至想要關閉這篇文章。
不好意思,收起你的笑容,不能用 Redis,不能用第三方組件,只能用 JDK 的東西。

別問為什麼,問就是沒有引入。
這個時候你怎麼辦?
初始方案
聽說不能用第三方組件之後,你也一點不慌,大喊一聲:鍵來。

拿著鍵盤只需要啪啪啪三下就寫完了程式碼:

加上一個 synchronized 關鍵字就算完事,甚至你還記得程式設計師的自我修養,完成了一波自測,發現確實沒有問題:

loadFromDB 方法只執行了一次。
但是,朋友,你有沒有想過你這個鎖的粒度有點太大了啊。
直接把整個方法給鎖了。
本來一個好好的並行方法,你給咔一下,搞成串列的了:

而且你這是無差別亂殺啊,比如上面這個示意圖,你要是說當第二次查詢 why 的成績的時候,把這個請求給攔下來,可以理解。
但是你同時也把第一次查詢 mx 的成績給攔截了。弄得 mx 同學一臉懵逼,搞不清啥情況。
注意,這個時候自然而然就會想到縮小鎖的粒度,把鎖的範圍從全局修改為局部,拿出比如用 why 對象作為鎖的這一類解決方案。
比如偽程式碼改成這樣:
Integer score = map.get("why");
if(score == null){
synchronized("why"){
score = loadFormDB("why");
map.put("why",score);
}
}
如果到這裡你還沒反應過來,那麼我再換個例子。
假設我這裡的查詢條件變 Integer 類型的編號呢?
比如我的編號是 200,是不是偽程式碼就變成了這樣:
Integer score = map.get(200);
if(score == null){
synchronized(200){
score = loadFormDB(200);
map.put(200,score);
}
}
看到這裡你要是還沒反應過來的話我只能大喊一聲:你個假讀者!之前發的文章肯定沒看吧?
之前的《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章不全篇都在說這個事兒嗎?
你要不知道問題是啥,你就去翻一下。
這篇文章肯定也不會往這個方向去寫。不能死盯著 synchronize 不放,不然思路打不開。

我們這裡不能用 synchronized 這個玩意。
但是你仔細一看,如果不用 synchronized 的話,這個 map 也不行啊:
private final Map<String, Integer> SCORE_CACHE = new HashMap<>();
這是個 HashMap,不是執行緒安全的呀。
怎麼辦?
演進唄。
演進一下

這一步非常簡單,和最開始的程式相比,只是把 HashMap 替換為 ConcurrentHashMap。
然後就啥也沒幹了。
是不是感覺有點懵逼,甚至感覺有一定被耍了的感覺?
有就對了,因為這一步改變就是書裡面的一個方案,我第一次看到的時候反正也是感覺有點懵逼:

我真沒騙你,不信我拍照給你看:
.png)
這個方案和初始方案比,唯一的優點就是並發度上來了,因為 ConcurrentHashMap 是執行緒安全的。

但是,整個方案作為快取來說,從上面的示意圖也能看出,就是一句話:卵用沒有。

因為根本就不能滿足「相同的請求下,如果快取中沒有,只有一個請求執行緒執行 loadFormDB 方法」這個需求,比如 why 的短時間內的兩次查詢操作就執行兩次 loadFormDB 方法。
它的毛病在哪兒呢?
如果多個執行緒都是查 why 這個人成績的前提下,如果一個執行緒去執行 loadFormDB 方法了,而另外的執行緒根本感知不到有執行緒在執行該方法,那麼它們衝進來後一看:我去,快取裡面壓根沒有啊?那我也去執行 loadFormDB 方法吧。
完犢子了,重複執行了。
那麼在 JDK 原生的方法裡面有沒有一種機制來表示已經有一個請求查詢 why 成績的執行緒在執行 loadFormDB 方法了,那麼其他的查詢 why 成績的執行緒就等這個結果就行了,沒必要自己也去執行一遍。
這個時候就考驗你的知識儲備了。
你想到了什麼?

繼續演進
FutureTask 是非同步編程裡面的一個非常重要的組成部分。

比如執行緒池的應用中,當你使用 submit 的方式提交任務時,它的返回類型就是 Future:

反正基於 Future 這個東西,可以玩出花兒來。
比如我們的這個場景中,如果要用到 FutureTask,那麼我們的 Map 就需要修改為這樣:
Map<String, Future
> SCORE_CACHE = new ConcurrentHashMap<>();
通過維護姓名和 Future 的關係來達到我們的目的。
Future 本身就代表一個任務,對於快取維護這個需求來說,這個任務到底是在執行中還是已經執行完成了它並不關心,這個「它」指的是 SCORE_CACHE 這個 Map。
對於 Map 來說,只要有個任務放進來就行了。
而任務到底執行完成沒有,應該是從 Map 裡面 get 到對應 Future 的這個執行緒關心的。
它怎麼關心?
通過調用 Future.get() 方法。
整個程式碼寫出來就是這樣的:
public class ScoreQueryService {
private final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();
public Integer query(String userName) throws Exception {
Future<Integer> future = SCORE_CACHE.get(userName);
if (future == null) {
Callable<Integer> callable = () -> loadFormDB(userName);
FutureTask futureTask = new FutureTask<>(callable);
future = futureTask;
SCORE_CACHE.put(userName, futureTask);
futureTask.run();
}
return future.get();
}
private Integer loadFormDB(String userName) throws InterruptedException {
System.out.println("開始查詢userName=" + userName + "的分數");
//模擬耗時
TimeUnit.SECONDS.sleep(1);
return ThreadLocalRandom.current().nextInt(380, 420);
}
}
怕你不熟悉 futureTask ,所以簡單解釋一下關於 futureTask 的四行程式碼,但是我還是強烈建議你把這個東西掌握了,畢竟說它是非同步編程的基石之一也不為過。
基石還是得拿捏明白,否則就容易被面試官拿捏。
Callable<Integer> callable = () -> loadFormDB(userName);
FutureTask futureTask = new FutureTask<>(callable);
futureTask.run();
return future.get();
首先我構建了一個 Callable 作為 FutureTask 構造函數的入參。

構造函數上面的描述翻譯過來就是:創建一個 FutureTask,運行時將執行給定的 Callable。
「運行時」指的就是 futureTask.run()
這一行程式碼,而「給定的 Callable 」就是 loadFormDB 任務。
也就是說調用了 futureTask.run()
之後,才有可能會執行到 loadFormDB 方法。
然後調用 future.get()
就是獲取 Callable 的結果 ,即獲取 loadFormDB 方法的結果。如果該方法還沒有運行結束,就死等。
對於這個方案,書上是這樣說的:

主要關注我劃線的部分,我一句句的說
它只有一個缺陷,即仍然存在兩個執行緒計算出相同值的漏洞。
這句話其實很好理解,因為程式碼裡面始終有一個「①獲取-②判斷-③放置」的動作。

這個動作就不是原子性的,所以有一定的幾率兩個執行緒都衝進來,然後發現快取中沒有,就都走到 if 分支裡面去了。
但是標號為 ① 和 ② 的地方,從需求實現的角度來說,肯定是必不可少的。
能想辦法的地方也就只有標號為 ③ 的地方了。
到底啥辦法呢?
不著急,下一小節說,我先把後半句話給解釋了:
這個漏洞的發生概率要遠小於 Memoizer2 中發生的概率。
Memoizer2 就是指前面用 ConcurrentHashMap 替換 HashMap 後的方案。
那麼為什麼引入 Future 之後的這個方案,觸發剛剛說到的 bug 的概率比之前的方案小呢?
答案就藏在這兩行程式碼裡面:

之前是要把業務邏輯執行完成,拿到返回值之後才能維護到快取裡面。
現在是先維護快取,然後再執行業務邏輯,節約了執行業務邏輯的時間。
而一般來說最耗時的地方就是業務邏輯的執行,所以這個「遠小於」就是這樣來的。
那怎麼辦呢?
接著演進呀。
最終版
書裡面,針對上面那個「若沒有則添加」這個非原子性的動作的時候,提到了 map 的一個方法:
.png)
Map 的 putIfAbsent,這個方法就厲害了。帶你看一下:

首先從標號為 ① 的地方我們可以知道,這個方法傳進來的 key 如果還沒有與一個值相關聯(或被映射為null),則將其與給定的值進行映射並返回 null ,否則返回當前值。
如果我們只關心返回值的話,那就是:如果有就返回對應的值,如果沒有就返回 null。
標號為 ② 的地方說的是啥呢?
它說默認的實現沒有對這個方法的同步性或原子性做出保證。如果你要提供原子性保證,那麼就請覆蓋此方法,自己去寫。
所以,我們接著就要關注一下 ConcurrentHashMap 的這個方法是怎麼搞得了:


還是通過 synchronized 方法來保證了原子性,當操作的是同一個 key 的時候保證只有一個執行緒去執行 put 的操作。
所以書中給出的最終實現,是這樣的:
public class ScoreQueryService {
public static final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();
public Integer query(String userName) throws Exception {
while (true) {
Future<Integer> future = SCORE_CACHE.get(userName);
if (future == null) {
Callable<Integer> callable = () -> loadFormDB(userName);
FutureTask futureTask = new FutureTask<>(callable);
future = SCORE_CACHE.putIfAbsent(userName, futureTask);
//如果為空說明之前這個 key 在 map 裡面不存在
if (future == null) {
future = futureTask;
futureTask.run();
}
}
try {
return future.get();
} catch (CancellationException e) {
System.out.println("查詢userName=" + userName + "的任務被移除");
SCORE_CACHE.remove(userName, future);
} catch (Exception e) {
throw e;
}
}
}
private Integer loadFormDB(String userName) throws InterruptedException {
System.out.println("開始查詢userName=" + userName + "的分數");
//模擬耗時
TimeUnit.SECONDS.sleep(5);
return ThreadLocalRandom.current().nextInt(380, 420);
}
}
與前一個方案,有三個不一樣的地方。
-
第一個是採用了 putIfAbsent 替換 put 方法。 -
第二個是加入了 while(true) 循環。 -
第三個是 future.get() 拋出 CancellationException 異常後執行了清除快取的動作。
第一個沒啥說的,前面已經解釋了。
第二個和第三個,說實話當他們組合在一起用的時候,我沒看的太明白。
首先,從程式上講,這兩個是相輔相成的程式碼,因為 while(true) 循環我理解只有 future.get() 拋出 CancellationException 異常的時候才會起到作用。
拋出 CancellationException 異常,說明當前的這個任務被其他地方調用了 cancel 方法,而由於 while(true) 的存在,且當前的這個任務被 remove 了,所以 if 條件成功,就會再次構建一個一樣的任務,然後繼續執行:

也就是說移除的任務和放進去的任務是一模一樣的。
那是不是就不用移除?
沒轉過彎的話沒關係,我先給你上個程式碼看看,你就明白了:

其中 ScoreQueryService 的程式碼我前面已經給了,就不截圖了。
可以看到這次只往執行緒池裡面扔了一個任務,然後接著把快取裡面的任務拿出來,調用 cancel 方法取消掉。
這個程式的輸出結果是這樣的:

所以,由於 while(true) 的存在,導致 cancel 方法失效。
然後我前面說:移除的任務和放進去的任務是一模一樣的。那是不是就不用移除?
表現在程式碼裡面就是這樣的:

不知道作者為啥要專門搞個移除的動作,經過這一波分析,這一行程式碼完全是可以注釋掉的嘛。
但是…
對嗎?

這是不對的,老鐵。如果這行程式碼被注釋了,那麼程式的輸出就是這樣的:

變成一個死循環了。
為什麼變成死循環了?
因為 FutureTask 這個玩意是有生命周期的:

被 cancelled 之後,生命周期就完成了,所以如果不從快取裡面移走那就芭比Q了,取出來的始終是被取消的這個,那麼就會一直拋出異常,然後繼續循環。
死循環就是這樣來的。
所以移除的動作必須得有, while(true) 就看你的需求了,加上就是 cannel 方法「失效」,去掉就是可以調用 cannel 方法。
關於 FutureTask 如果你不熟悉的話,我寫過兩篇文章,你可以看看。
《Doug Lea在J.U.C包裡面寫的BUG又被網友發現了。》
接著,我們再驗證一下最終程式碼是否運行正常:

三個執行緒最終查出來的分數是一樣的,沒毛病。
如果你想觀察一下阻塞的情況,那麼可以把睡眠時間拉長一點:

然後,把程式碼跑起來,看堆棧資訊:


一個執行緒在 sleep,另外兩個執行緒執行到了 FutureTask 的 get 方法。
sleep 的好理解,為什麼另外兩個執行緒阻塞在 get 方法上呢?
很簡單,因為另外兩個執行緒返回的 future 不是 null,這是由 putIfAbsent 方法的特性決定的:

好了,書中給出的最終方案的程式碼也解釋完了。
但是書裡面還留下了兩個「坑」:
.png)
一個是不支援快取過期機制。
一個是不支援快取淘汰機制。
等下再說,先說說我的另一個方案。
還有一個方案
其實我也還有一個方案,拿出來給大家看看:
public class ScoreQueryService2 {
public static final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();
public Integer query(String userName) throws Exception {
while (true) {
Future<Integer> future = SCORE_CACHE.get(userName);
if (future == null) {
Callable<Integer> callable = () -> loadFormDB(userName);
FutureTask futureTask = new FutureTask<>(callable);
FutureTask<Integer> integerFuture = (FutureTask) SCORE_CACHE.computeIfAbsent(userName, key -> futureTask);
future = integerFuture;
integerFuture.run();
}
try {
return future.get();
} catch (CancellationException e) {
SCORE_CACHE.remove(userName, future);
} catch (Exception e) {
throw e;
}
}
}
private Integer loadFormDB(String userName) throws InterruptedException {
System.out.println("開始查詢userName=" + userName + "的分數");
//模擬耗時
TimeUnit.SECONDS.sleep(1);
return ThreadLocalRandom.current().nextInt(380, 420);
}
}
和書中給出的方案差異點在於用 computeIfAbsent 代替了 putIfAbsent:
V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)
computeIfAbsent,首先它也是一個執行緒安全的方法,這個方法會檢查 Map 中的 Key,如果發現 Key 不存在或者對應的值是 null,則調用 Function 來產生一個值,然後將其放入 Map,最後返回這個值;否則的話返回 Map 已經存在的值。
putIfAbsent,如果 Key 不存在或者對應的值是 null,則將 Value 設置進去,然後返回 null;否則只返回 Map 當中對應的值,而不做其他操作。
所以這二者的區別之一在於返回值上。
用了 computeIfAbsent 之後,每次返回的都是同一個 FutureTask,但是由於 FutureTask 的生命周期,或者說是狀態扭轉的存在,即使三個執行緒都調用了它的 run 方法,這個 FutureTask 也只會執行成功一次。
可以看一下,這個 run 方法的源碼,一進來就是狀態和當前操作執行緒的判斷:

所以執行完一次 run 方法之後,再次調用 run 方法並不會真的執行。
但是從程式實現的優雅角度來說,還是 putIfAbsent 方法更好。
坑怎麼辦?
前面不是說最終的方案有兩個坑嘛:
-
一個是不支援快取過期機制。 -
一個是不支援快取淘汰機制。
在使用 ConcurrentHashMap 的前提下,這兩個特性如果要支援的話,需要進行對應的開發,比如引入定時任務來解決,想想就覺得麻煩。
同時也我想到了 spring-cache,我知道這裡面有 ConcurrentHashMap 作為快取的實現方案。
我想看看這個組件裡面是怎麼解決這兩個問題的。
二話不說,我先把程式碼拉下來看看:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
由於 spring-cache 也不是本文重點,所以我就直接說關鍵地方的源碼了。
至於是怎麼找到這裡來的,就不詳細介紹了,以後安排文章詳細解釋。
另外我不得不說一句:spring-cache 這玩意真的是優雅的一比,不論是源碼還是設計模式的應用,都非常的好。

首先,我們可以看到 @Cacheable 註解裡面有一個參數叫做 sycn,默認值是 false:

關於這個參數,官網上的解釋是這樣的:
//docs.spring.io/spring-framework/docs/current/reference/html/integration.html#cache-annotations-cacheable-cache-resolver

就是針對我們前面提到的快取如何維護的情況的一個處理方案。使用方法也很簡單。
該功能對應的核心部分的源碼在這個位置:
org.springframework.cache.interceptor.CacheAspectSupport#execute(org.springframework.cache.interceptor.CacheOperationInvoker, java.lang.reflect.Method, org.springframework.cache.interceptor.CacheAspectSupport.CacheOperationContexts)

在上面這個方法中會判斷是不是 sync=true 的方法,如果是則進入到 if 分支裡面。
接著會執行到下面這個重要的方法:
org.springframework.cache.interceptor.CacheAspectSupport#handleSynchronizedGet

在這個方法裡面,入參 cache 是一個抽象類,Spring 提供了六種默認的實現:

而我關心的是 ConcurrentMapCache 實現,點進去一看,好傢夥,這方法我熟啊:
org.springframework.cache.concurrent.ConcurrentMapCache#get

computeIfAbsent 方法,我們不是剛剛才說了嘛。但是我左翻右翻就是找不到設置過期時間和淘汰策略的地方。
於是,我又去翻官網了,發現答案就直接寫在官網上的:
//docs.spring.io/spring-framework/docs/current/reference/html/integration.html#cache-specific-config

這裡說了,官方提供的是一個快取的抽象,而不是具體的實現。而快取過期和淘汰機制不屬於抽象的範圍內。
為什麼呢?
比如拿 ConcurrentHashMap 來說,假設我提供了快取過期和淘汰機制的抽象,那你說 ConcurrentHashMap 怎麼去實現這個抽象方法?
實現不了,因為它本來就不支援這個機制。
所以官方認為這樣的功能應該由具體的快取實現類去實現而不是提供抽象方法。
這裡也就回復了前面的最終方案引申出的這兩個問題:
-
一個是不支援快取過期機制。 -
一個是不支援快取淘汰機制。
別問,問就是原生的方法裡面是支援不了的。如果要實現自己去寫程式碼,或者換一個快取方案。
再說兩個點
最後,再補充兩個點。
第一個點是之前的《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章裡面,有一個地方寫錯了。

框起來的地方是我後面加上來的。

上周的文章發出去後,大概有十來個讀者給我回饋這個問題。
我真的特別的開心,因為真的有人把我的示例程式碼拿去跑了,且認真思考了,然後來和我討論,幫我指正我寫的不對的地方。
再給大家分享一下我的這篇文章《當我看技術文章的時候,我在想什麼?》
裡面表達了我對於看技術部落格的態度:
看技術文章的時候多想一步,有時候會有更加深刻的理解。
帶著懷疑的眼光去看部落格,帶著求證的想法去證偽。
多想想 why,總是會有收穫的。
第二個點是這樣的。
關於 ConcurrentHashMap 的 computeIfAbsent 我其實也專門寫過文章的:《震驚!ConcurrentHashMap裡面也有死循環,作者留下的「彩蛋」了解一下?》
老讀者應該是讀到過這篇文章的。
之前在 seata 官網上閑逛的時候,看到了這篇部落格:
//seata.io/zh-cn/blog/seata-dsproxy-deadlock.html
名字叫做《ConcurrentHashMap導致的Seata死鎖問題》,我就隨便這麼點進去一看:

這裡提到的這篇文章,就是我寫的。
在 seata 官網上偶遇自己的文章是一種很神奇的體驗。
四捨五入,我也算是給 seata 有過貢獻的男人。
而且你看這篇文章其實也提到了我之前寫過的很多文章,這些知識都通過一個小小的點串起來了,由點到線,由線到面,這也是我堅持寫作的原因。
共勉之。
最後,呼應一下文章的開頭部分,考研馬上要查分了,我知道我的讀者裡面還是有不少是今年考研的。
如果你看到了這裡,那麼下面這個圖送給你:

本文已收錄至個人部落格,更多原創好文,歡迎大家來玩:
//www.whywhy.vip/