實戰派 | Java項目中玩轉Redis6.0客戶端快取!

原創:微信公眾號 碼農參上,歡迎分享,轉載請保留出處。

哈嘍大家好啊,我是Hydra。

在前面的文章中,我們介紹了Redis6.0中的新特性客戶端快取client-side caching,通過telnet連接模擬客戶端,測試了三種客戶端快取的工作模式,這篇文章我們就來點硬核實戰,看看客戶端快取在java項目中應該如何落地。

鋪墊

首先介紹一下今天要使用到的工具Lettuce,它是一個可伸縮執行緒安全的redis客戶端。多個執行緒可以共享同一個RedisConnection,利用nio框架Netty來高效地管理多個連接。

放眼望向現在常用的redis客戶端開發工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支援客戶端快取功能的卻不多,而lettuce就是其中的領跑者。

我們先在項目中引入最新版本的依賴,下面正式開始實戰環節:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.1.8.RELEASE</version>
</dependency>

實戰

在項目中應用lettuce,開啟並使用客戶端快取功能,只需要下面這一段程式碼:

public static void main(String[] args) throws InterruptedException {
    // 創建 RedisClient 連接資訊
    RedisURI redisURI= RedisURI.builder()
            .withHost("127.0.0.1")
            .withPort(6379)
            .build();
    RedisClient client = RedisClient.create(redisURI);
    StatefulRedisConnection<String, String> connect = client.connect();
    
    Map<String, String> map = new HashMap<>();
    CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
            connect, TrackingArgs.Builder.enabled().noloop());

    String key="user";
    while (true){
        String value = frontend.get(key);
        System.out.println(value);
        TimeUnit.SECONDS.sleep(10);
    }
}

上面的程式碼主要完成了幾項工作:

  • 通過RedisURI配置redis連接的標準資訊,並建立連接
  • 創建用於充當本地快取的Map,開啟客戶端快取功能,建立一個快取訪問器CacheFrontend
  • 在循環中使用CacheFrontend,不斷查詢同一個key對應的值並列印

啟動上面的程式,控制台會不斷的列印user對應的快取,在啟動一段時間後,我們在其他的客戶端修改user對應的值,運行的結果如下:

可以看到,在其他客戶端修改了key所對應的值後,列印結果也發生了變化。但是到這裡,我們也不知道lettuce是不是真的使用了客戶端快取,雖然結果正確,但是說不定是它每次都重新執行了get命令呢?

所以我們下面來看看源碼,分析一下具體的程式碼執行流程。

分析

在上面的程式碼中,最關鍵的類就是CacheFrontend了,我們再來仔細看一下上面具體實例化時的語句:

CacheFrontend<String,String> frontend=ClientSideCaching.enable(
        CacheAccessor.forMap(map),
        connect,
        TrackingArgs.Builder.enabled().noloop()
);

首先調用了ClientSideCachingenable()方法,我們看一下它的源碼:

解釋一下傳入的3個參數:

  • CacheAccessor:一個定義對客戶端快取進行訪問介面,上面調用它的forMap方法返回的是一個MapCacheAccessor,它的底層使用的我們自定義的Map來存放本地快取,並且提供了getputevict等方法操作Map
  • StatefulRedisConnection:使用到的redis連接
  • TrackingArgs:客戶端快取的參數配置,使用noloop後不會接收當前連接修改key後的通知

向redis服務端發送開啟tracking的命令後,繼續向下調用create()方法:

這個過程中實例化了一個重要對象,它就是實現了RedisCache介面的DefaultRedisCache對象,實際向redis執行查詢時的get請求、寫入的put請求,都是由它來完成。

實例化完成後,繼續向下調用同名的create()方法:

在這個方法中,實例化了ClientSideCaching對象,注意一下傳入的兩個參數,通過前面的介紹也很好理解它們的分工:

  • 當本地快取存在時,直接從CacheAccessor中讀取
  • 當本地快取不存在時,使用RedisCache從服務端讀取

需要額外注意一下的是返回前的兩行程式碼,先看第一句(行號114的那行)。

這裡向RedisCache添加了一個監聽,當監聽到類型為invalidate的作廢消息時,拿到要作廢的key,傳遞給消費者。一般情況下,keys中只會有一個元素。

消費時會遍歷當前ClientSideCaching的消費者列表invalidationListeners

而這個列表中的所有,就是在上面的第二行程式碼中(行號115的那行)添加的,看一下方法的定義:

而實際傳入的方法引用則是下面MapCacheAccessorevict()方法,也就是說,當收到key作廢的消息後,會移除掉本地快取Map中快取的這個數據。

客戶端快取的作廢邏輯我們梳理清楚了,再來看看它是何時寫入的,直接看ClientSideCachingget()方法:

可以看到,get方法會先從本地快取MapCacheAccessor中嘗試獲取,如果取到則直接返回,如果沒有再使用RedisCache讀取redis中的快取,並將返回的結果存入到MapCacheAccessor中。

圖解

源碼看到這裡,是不是基本邏輯就串聯起來了,我們再畫兩張圖來梳理一下這個流程。先看get的過程:

再來看一下通知客戶端快取失效的過程:

怎麼樣,配合這兩張圖再理解一下,是不是很完美?

其實也不是…回憶一下我們之前使用兩級快取Caffeine+Redis時,當時使用的通知機制,會在修改redis快取後通知所有主機修改本地快取,修改成為最新的值。目前的lettuce看來,顯然不滿足這一功能,只能做到作廢刪除快取但是不會主動更新。

擴展

那麼,如果想實現本地客戶端快取的實時更新,我們應該如何在現在的基礎上進行擴展呢?仔細想一下的話,思路也很簡單:

  • 首先,移除掉lettuce的客戶端快取本身自帶的作廢消息監聽器
  • 然後,添加我們自己的作廢消息監聽器

回顧一下上面源碼分析的圖,在調用DefaultRedisCacheaddInvalidationListener()方法時,其實是調用的是StatefulRedisConnectionaddListener()方法,也就是說,這個監聽器其實是添加在redis連接上的。

如果我們再看一下這個方法源碼的話,就會發現,在它的附近還有一個對應的removeListener()方法,一看就是我們要找的東西,準備用它來移除消息監聽。

不過再仔細看看,這個方法是要傳參數的啊,我們明顯不知道現在裡面已經存在的PushListener有什麼,所以沒法直接使用,那麼無奈只能再接著往下看看這個pushHandler是什麼玩意…

通過注釋可以知道,這個PushHandler就是一個用來操作PushListener的處理工具,雖然我們不知道具體要移除的PushListener是哪一個,但是驚喜的是,它提供了一個getPushListeners()方法,可以獲取當前所有的監聽器。

這樣一來就簡單了,我上來直接清除掉這個集合中的所有監聽器,問題就迎刃而解了~

不過,在StatefulRedisConnectionImpl中的pushHandler是一個私有對象,也沒有對外進行暴露,想要操作起來還是需要費上一點功夫的。下面,我們就在分析的結果上進行程式碼的修改。

魔改

首先,我們需要自定義一個工具類,它的主要功能是操作監聽器,所以就命名為ListenerChanger好了。它要完成的功能主要有三個:

  • 移除原有的全部消息監聽
  • 添加新的自定義消息監聽
  • 更新本地快取MapCacheAccessor中的數據

首先定義構造方法,需要傳入StatefulRedisConnectionCacheAccessor作為參數,在後面的方法中會用到,並且創建一個RedisCommands,用於後面向redis服務端發送get命令請求。

public class ListenerChanger<K, V> {
    private StatefulRedisConnection<K, V> connection;
    private CacheAccessor<K, V> mapCacheAccessor;
    private RedisCommands<K, V> command;

    public ListenerChanger(StatefulRedisConnection<K, V> connection,
                           CacheAccessor<K, V> mapCacheAccessor) {
        this.connection = connection;
        this.mapCacheAccessor = mapCacheAccessor;
        this.command = connection.sync();
    }
    
    //其他方法先省略……
}

移除監聽

前面說過,pushHandler是一個私有對象,我們無法直接獲取和操作,所以只能先使用反射獲得。PushHandler中的監聽器列表存儲在一個CopyOnWriteArrayList中,我們直接使用迭代器移除掉所有內容即可。

public void removeAllListeners() {
    try {
        Class connectionClass = StatefulRedisConnectionImpl.class;
        Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
        pushHandlerField.setAccessible(true);
        PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);

        CopyOnWriteArrayList<PushListener> pushListeners
                = (CopyOnWriteArrayList) pushHandler.getPushListeners();
        Iterator<PushListener> it = pushListeners.iterator();
        while (it.hasNext()) {
            PushListener listener = it.next();
            pushListeners.remove(listener);
        }
    } catch (NoSuchFieldException | IllegalAccessException e) {
        e.printStackTrace();
    }
}

添加監聽

這裡我們模仿DefaultRedisCacheaddInvalidationListener()方法的寫法,添加一個監聽器,除了最後處理的程式碼基本一致。對於監聽到的要作廢的keys集合,另外啟動一個執行緒更新本地數據。

public void addNewListener() {
    this.connection.addListener(new PushListener() {
        @Override
        public void onPushMessage(PushMessage message) {
            if (message.getType().equals("invalidate")) {
                List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
                List<K> keys = (List<K>) content.get(1);
                System.out.println("modifyKeys:"+keys);

                // start a new thread to update cacheAccessor
                new Thread(()-> updateMap(keys)).start();
            }
        }
    });
}

本地更新

使用RedisCommands重新從redis服務端獲取最新的數據,並更新本地快取mapCacheAccessor中的數據。

private void updateMap(List<K> keys){
    for (K key : keys) {
        V newValue = this.command.get(key);
        System.out.println("newValue:"+newValue);
        mapCacheAccessor.put(key, newValue);
    }
}

至於為什麼執行這個方法時額外啟動了一個新執行緒,是因為我在測試中發現,當在PushListeneronPushMessage方法中執行RedisCommandsget()方法時,會一直取不到值,但是像這樣新啟動一個執行緒就沒有問題。

測試

下面,我們來寫一段測試程式碼,來測試上面的改動。

public static void main(String[] args) throws InterruptedException {
	// 省略之前創建連接程式碼……
    
    Map<String, String> map = new HashMap<>();
    CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
    CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
            connect,
            TrackingArgs.Builder.enabled().noloop());

    ListenerChanger<String, String> listenerChanger
            = new ListenerChanger<>(connect, mapCacheAccessor);
    // 移除原有的listeners
    listenerChanger.removeAllListeners();
    // 添加新的監聽器
    listenerChanger.addNewListener();

    String key = "user";
    while (true) {
        String value = frontend.get(key);
        System.out.println(value);
        TimeUnit.SECONDS.sleep(30);
    }
}

可以看到,程式碼基本上在之前的基礎上沒有做什麼改動,只是在創建完ClientSideCaching後,執行了我們自己實現的ListenerChanger的兩個方法。先移除所有監聽器、再添加新的監聽器。下面我們以debug模式啟動測試程式碼,簡單看一下程式碼的執行邏輯。

首先,在未執行移除操作前,pushHandler中的監聽器列表中有一個監聽器:

移除後,監聽器列表為空:

在添加完自定義監聽器、並且執行完第一次查詢操作後,在另外一個redis客戶端中修改user的值,這時PushListener會收到作廢類型的消息監聽:

啟動一個新執行緒,查詢redis中user對應的最新值,並放入cacheAccessor中:

當循環中CacheFrontendget()方法再被執行時,會直接從cacheAccessor中取到刷新後的值,不需要再次去訪問redis服務端了:

總結

到這裡,我們基於lettuce的客戶端快取的基本使用、以及在這個基礎上進行的魔改就基本完成了。可以看到,lettuce客戶端已經在底層封裝了一套比較成熟的API,能讓我們在將redis升級到6.0以後,開箱即用式地使用客戶端快取這一新特性。在使用中,不需要我們關注底層原理,也不用做什麼業務邏輯的改造,總的來說,使用起來還是挺香的。

那麼,這次的分享就到這裡,我是Hydra,下篇文章再見。

推薦閱讀

引入『客戶端快取』,Redis6算是把快取玩明白了…

作者簡介,碼農參上,一個熱愛分享的公眾號,有趣、深入、直接,與你聊聊技術。歡迎添加好友,進一步交流。