實戰派 | Java項目中玩轉Redis6.0客戶端快取!
原創:微信公眾號
碼農參上
,歡迎分享,轉載請保留出處。
哈嘍大家好啊,我是Hydra。
在前面的文章中,我們介紹了Redis6.0中的新特性客戶端快取client-side caching
,通過telnet連接模擬客戶端,測試了三種客戶端快取的工作模式,這篇文章我們就來點硬核實戰,看看客戶端快取在java項目中應該如何落地。
鋪墊
首先介紹一下今天要使用到的工具Lettuce
,它是一個可伸縮執行緒安全的redis客戶端。多個執行緒可以共享同一個RedisConnection
,利用nio框架Netty
來高效地管理多個連接。
放眼望向現在常用的redis客戶端開發工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支援客戶端快取功能的卻不多,而lettuce就是其中的領跑者。
我們先在項目中引入最新版本的依賴,下面正式開始實戰環節:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.8.RELEASE</version>
</dependency>
實戰
在項目中應用lettuce,開啟並使用客戶端快取功能,只需要下面這一段程式碼:
public static void main(String[] args) throws InterruptedException {
// 創建 RedisClient 連接資訊
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connect = client.connect();
Map<String, String> map = new HashMap<>();
CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());
String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}
上面的程式碼主要完成了幾項工作:
- 通過
RedisURI
配置redis連接的標準資訊,並建立連接 - 創建用於充當本地快取的
Map
,開啟客戶端快取功能,建立一個快取訪問器CacheFrontend
- 在循環中使用
CacheFrontend
,不斷查詢同一個key對應的值並列印
啟動上面的程式,控制台會不斷的列印user
對應的快取,在啟動一段時間後,我們在其他的客戶端修改user
對應的值,運行的結果如下:
可以看到,在其他客戶端修改了key所對應的值後,列印結果也發生了變化。但是到這裡,我們也不知道lettuce
是不是真的使用了客戶端快取,雖然結果正確,但是說不定是它每次都重新執行了get
命令呢?
所以我們下面來看看源碼,分析一下具體的程式碼執行流程。
分析
在上面的程式碼中,最關鍵的類就是CacheFrontend
了,我們再來仔細看一下上面具體實例化時的語句:
CacheFrontend<String,String> frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);
首先調用了ClientSideCaching
的enable()
方法,我們看一下它的源碼:
解釋一下傳入的3個參數:
CacheAccessor
:一個定義對客戶端快取進行訪問介面,上面調用它的forMap
方法返回的是一個MapCacheAccessor
,它的底層使用的我們自定義的Map
來存放本地快取,並且提供了get
、put
、evict
等方法操作Map
StatefulRedisConnection
:使用到的redis連接TrackingArgs
:客戶端快取的參數配置,使用noloop
後不會接收當前連接修改key後的通知
向redis服務端發送開啟tracking
的命令後,繼續向下調用create()
方法:
這個過程中實例化了一個重要對象,它就是實現了RedisCache
介面的DefaultRedisCache
對象,實際向redis執行查詢時的get
請求、寫入的put
請求,都是由它來完成。
實例化完成後,繼續向下調用同名的create()
方法:
在這個方法中,實例化了ClientSideCaching
對象,注意一下傳入的兩個參數,通過前面的介紹也很好理解它們的分工:
- 當本地快取存在時,直接從
CacheAccessor
中讀取 - 當本地快取不存在時,使用
RedisCache
從服務端讀取
需要額外注意一下的是返回前的兩行程式碼,先看第一句(行號114的那行)。
這裡向RedisCache
添加了一個監聽,當監聽到類型為invalidate
的作廢消息時,拿到要作廢的key,傳遞給消費者。一般情況下,keys
中只會有一個元素。
消費時會遍歷當前ClientSideCaching
的消費者列表invalidationListeners
:
而這個列表中的所有,就是在上面的第二行程式碼中(行號115的那行)添加的,看一下方法的定義:
而實際傳入的方法引用則是下面MapCacheAccessor
的evict()
方法,也就是說,當收到key作廢的消息後,會移除掉本地快取Map
中快取的這個數據。
客戶端快取的作廢邏輯我們梳理清楚了,再來看看它是何時寫入的,直接看ClientSideCaching
的get()
方法:
可以看到,get
方法會先從本地快取MapCacheAccessor
中嘗試獲取,如果取到則直接返回,如果沒有再使用RedisCache
讀取redis中的快取,並將返回的結果存入到MapCacheAccessor
中。
圖解
源碼看到這裡,是不是基本邏輯就串聯起來了,我們再畫兩張圖來梳理一下這個流程。先看get
的過程:
再來看一下通知客戶端快取失效的過程:
怎麼樣,配合這兩張圖再理解一下,是不是很完美?
其實也不是…回憶一下我們之前使用兩級快取Caffeine+Redis
時,當時使用的通知機制,會在修改redis快取後通知所有主機修改本地快取,修改成為最新的值。目前的lettuce看來,顯然不滿足這一功能,只能做到作廢刪除快取但是不會主動更新。
擴展
那麼,如果想實現本地客戶端快取的實時更新,我們應該如何在現在的基礎上進行擴展呢?仔細想一下的話,思路也很簡單:
- 首先,移除掉
lettuce
的客戶端快取本身自帶的作廢消息監聽器 - 然後,添加我們自己的作廢消息監聽器
回顧一下上面源碼分析的圖,在調用DefaultRedisCache
的addInvalidationListener()
方法時,其實是調用的是StatefulRedisConnection
的addListener()
方法,也就是說,這個監聽器其實是添加在redis連接上的。
如果我們再看一下這個方法源碼的話,就會發現,在它的附近還有一個對應的removeListener()
方法,一看就是我們要找的東西,準備用它來移除消息監聽。
不過再仔細看看,這個方法是要傳參數的啊,我們明顯不知道現在裡面已經存在的PushListener
有什麼,所以沒法直接使用,那麼無奈只能再接著往下看看這個pushHandler
是什麼玩意…
通過注釋可以知道,這個PushHandler
就是一個用來操作PushListener
的處理工具,雖然我們不知道具體要移除的PushListener
是哪一個,但是驚喜的是,它提供了一個getPushListeners()
方法,可以獲取當前所有的監聽器。
這樣一來就簡單了,我上來直接清除掉這個集合中的所有監聽器,問題就迎刃而解了~
不過,在StatefulRedisConnectionImpl
中的pushHandler
是一個私有對象,也沒有對外進行暴露,想要操作起來還是需要費上一點功夫的。下面,我們就在分析的結果上進行程式碼的修改。
魔改
首先,我們需要自定義一個工具類,它的主要功能是操作監聽器,所以就命名為ListenerChanger
好了。它要完成的功能主要有三個:
- 移除原有的全部消息監聽
- 添加新的自定義消息監聽
- 更新本地快取
MapCacheAccessor
中的數據
首先定義構造方法,需要傳入StatefulRedisConnection
和CacheAccessor
作為參數,在後面的方法中會用到,並且創建一個RedisCommands
,用於後面向redis服務端發送get
命令請求。
public class ListenerChanger<K, V> {
private StatefulRedisConnection<K, V> connection;
private CacheAccessor<K, V> mapCacheAccessor;
private RedisCommands<K, V> command;
public ListenerChanger(StatefulRedisConnection<K, V> connection,
CacheAccessor<K, V> mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}
//其他方法先省略……
}
移除監聽
前面說過,pushHandler
是一個私有對象,我們無法直接獲取和操作,所以只能先使用反射獲得。PushHandler
中的監聽器列表存儲在一個CopyOnWriteArrayList
中,我們直接使用迭代器移除掉所有內容即可。
public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);
CopyOnWriteArrayList<PushListener> pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator<PushListener> it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
添加監聽
這裡我們模仿DefaultRedisCache
中addInvalidationListener()
方法的寫法,添加一個監聽器,除了最後處理的程式碼基本一致。對於監聽到的要作廢的keys
集合,另外啟動一個執行緒更新本地數據。
public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
List<K> keys = (List<K>) content.get(1);
System.out.println("modifyKeys:"+keys);
// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}
本地更新
使用RedisCommands
重新從redis服務端獲取最新的數據,並更新本地快取mapCacheAccessor
中的數據。
private void updateMap(List<K> keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}
至於為什麼執行這個方法時額外啟動了一個新執行緒,是因為我在測試中發現,當在PushListener
的onPushMessage
方法中執行RedisCommands
的get()
方法時,會一直取不到值,但是像這樣新啟動一個執行緒就沒有問題。
測試
下面,我們來寫一段測試程式碼,來測試上面的改動。
public static void main(String[] args) throws InterruptedException {
// 省略之前創建連接程式碼……
Map<String, String> map = new HashMap<>();
CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());
ListenerChanger<String, String> listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的監聽器
listenerChanger.addNewListener();
String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}
可以看到,程式碼基本上在之前的基礎上沒有做什麼改動,只是在創建完ClientSideCaching
後,執行了我們自己實現的ListenerChanger
的兩個方法。先移除所有監聽器、再添加新的監聽器。下面我們以debug模式啟動測試程式碼,簡單看一下程式碼的執行邏輯。
首先,在未執行移除操作前,pushHandler
中的監聽器列表中有一個監聽器:
移除後,監聽器列表為空:
在添加完自定義監聽器、並且執行完第一次查詢操作後,在另外一個redis客戶端中修改user
的值,這時PushListener
會收到作廢類型的消息監聽:
啟動一個新執行緒,查詢redis中user
對應的最新值,並放入cacheAccessor
中:
當循環中CacheFrontend
的get()
方法再被執行時,會直接從cacheAccessor
中取到刷新後的值,不需要再次去訪問redis服務端了:
總結
到這裡,我們基於lettuce
的客戶端快取的基本使用、以及在這個基礎上進行的魔改就基本完成了。可以看到,lettuce
客戶端已經在底層封裝了一套比較成熟的API,能讓我們在將redis升級到6.0以後,開箱即用式地使用客戶端快取這一新特性。在使用中,不需要我們關注底層原理,也不用做什麼業務邏輯的改造,總的來說,使用起來還是挺香的。
那麼,這次的分享就到這裡,我是Hydra,下篇文章再見。
推薦閱讀
作者簡介,
碼農參上
,一個熱愛分享的公眾號,有趣、深入、直接,與你聊聊技術。歡迎添加好友,進一步交流。