基於Spring接口,集成Caffeine+Redis兩級緩存
原創:微信公眾號
碼農參上
,歡迎分享,轉載請保留出處。
在上一篇文章Redis+Caffeine兩級緩存,讓訪問速度縱享絲滑中,我們介紹了3種整合Caffeine
和Redis
作為兩級緩存使用的方法,雖然說能夠實現功能,但實現手法還是太粗糙了,並且遺留了一些問題沒有處理。本文將在上一篇的基礎上,圍繞兩個方面進行進一步的改造:
JSR107
定義了緩存使用規範,spring中提供了基於這個規範的接口,所以我們可以直接使用spring中的接口進行Caffeine
和Redis
兩級緩存的整合改造- 在分佈式環境下,如果一台主機的本地緩存進行修改,需要通知其他主機修改本地緩存,解決分佈式環境下本地緩存一致性問題
好了,在明確了需要的改進問題後,下面我們開始正式修改。
改造
在上篇文章的v3
版本中,我們使用自定義註解的方式實現了兩級緩存通過一個註解管理的功能。本文我們換一種方式,直接通過擴展spring提供的接口來實現這個功能,在進行整合之前,我們需要簡單了解一下JSR107
緩存規範。
JSR107 規範
在JSR107
緩存規範中定義了5個核心接口,分別是CachingProvider
,CacheManager
,Cache
, Entry
和Expiry
,參考下面這張圖,可以看到除了Entry
和Expiry
以外,從上到下都是一對多的包含關係。
從上面這張圖我們可以看出,一個應用可以創建並管理多個CachingProvider
,同樣一個CachingProvider
也可以管理多個CacheManager
,緩存管理器CacheManager
中則維護了多個Cache
。
Cache
是一個類似Map
的數據結構,Entry
就是其中存儲的每一個key-value
數據對,並且每個Entry
都有一個過期時間Expiry
。而我們在使用spring集成第三方的緩存時,只需要實現Cache
和CacheManager
這兩個接口就可以了,下面分別具體來看一下。
Cache
spring中的Cache
接口規範了緩存組件的定義,包含了緩存的各種操作,實現具體緩存操作的管理。例如我們熟悉的RedisCache
、EhCacheCache
等,都實現了這個接口。
在Cache
接口中,定義了get
、put
、evict
、clear
等方法,分別對應緩存的存入、取出、刪除、清空操作。不過我們這裡不直接使用Cache
接口,上面這張圖中的AbstractValueAdaptingCache
是一個抽象類,它已經實現了Cache
接口,是spring在Cache
接口的基礎上幫助我們進行了一層封裝,所以我們直接繼承這個類就可以。
繼承AbstractValueAdaptingCache
抽象類後,除了創建Cache
的構造方法外,還需要實現下面的幾個方法:
// 在緩存中實際執行查找的操作,父類的get()方法會調用這個方法
protected abstract Object lookup(Object key);
// 通過key獲取緩存值,如果沒有找到,會調用valueLoader的call()方法
public <T> T get(Object key, Callable<T> valueLoader);
// 將數據放入緩存中
public void put(Object key, Object value);
// 刪除緩存
public void evict(Object key);
// 清空緩存中所有數據
public void clear();
// 獲取緩存名稱,一般在CacheManager創建時指定
String getName();
// 獲取實際使用的緩存
Object getNativeCache();
因為要整合RedisTemplate
和Caffeine
的Cache
,所以這些都需要在緩存的構造方法中傳入,除此之外構造方法中還需要再傳出緩存名稱cacheName
,以及在配置文件中實際配置的一些緩存參數。先看一下構造方法的實現:
public class DoubleCache extends AbstractValueAdaptingCache {
private String cacheName;
private RedisTemplate<Object, Object> redisTemplate;
private Cache<Object, Object> caffeineCache;
private DoubleCacheConfig doubleCacheConfig;
protected DoubleCache(boolean allowNullValues) {
super(allowNullValues);
}
public DoubleCache(String cacheName,RedisTemplate<Object, Object> redisTemplate,
Cache<Object, Object> caffeineCache,
DoubleCacheConfig doubleCacheConfig){
super(doubleCacheConfig.getAllowNull());
this.cacheName=cacheName;
this.redisTemplate=redisTemplate;
this.caffeineCache=caffeineCache;
this.doubleCacheConfig=doubleCacheConfig;
}
//...
}
抽象父類的構造方法中只有一個boolean
類型的參數allowNullValues
,表示是否允許緩存對象為null
。除此之外,AbstractValueAdaptingCache
中還定義了兩個包裝方法來配合這個參數進行使用,分別是toStoreValue
和fromStoreValue
,特殊用途是用於在緩存null
對象時進行包裝、以及在獲取時進行解析並返回。
我們之後會在CacheManager
中調用後面這個自己實現的構造方法,來實例化Cache
對象,參數中DoubleCacheConfig
是使用@ConfigurationProperties
讀取的yml配置文件封裝的數據對象,會在後面使用。
當一個方法添加了@Cacheable
註解時,執行時會先調用父類AbstractValueAdaptingCache
中的get(key)
方法,它會再調用我們自己實現的lookup
方法。在實際執行查找操作的lookup
方法中,我們的邏輯仍然是先查找Caffeine
、沒有找到時再查找Redis
:
@Override
protected Object lookup(Object key) {
// 先從caffeine中查找
Object obj = caffeineCache.getIfPresent(key);
if (Objects.nonNull(obj)){
log.info("get data from caffeine");
return obj;
}
//再從redis中查找
String redisKey=this.name+":"+ key;
obj = redisTemplate.opsForValue().get(redisKey);
if (Objects.nonNull(obj)){
log.info("get data from redis");
caffeineCache.put(key,obj);
}
return obj;
}
如果lookup
方法的返回結果不為null
,那麼就會直接返回結果給調用方。如果返回為null
時,就會執行原方法,執行完成後調用put
方法,將數據放入緩存中。接下來我們實現put
方法:
@Override
public void put(Object key, Object value) {
if(!isAllowNullValues() && Objects.isNull(value)){
log.error("the value NULL will not be cached");
return;
}
//使用 toStoreValue(value) 包裝,解決caffeine不能存null的問題
caffeineCache.put(key,toStoreValue(value));
// null對象只存在caffeine中一份就夠了,不用存redis了
if (Objects.isNull(value))
return;
String redisKey=this.cacheName +":"+ key;
Optional<Long> expireOpt = Optional.ofNullable(doubleCacheConfig)
.map(DoubleCacheConfig::getRedisExpire);
if (expireOpt.isPresent()){
redisTemplate.opsForValue().set(redisKey,toStoreValue(value),
expireOpt.get(), TimeUnit.SECONDS);
}else{
redisTemplate.opsForValue().set(redisKey,toStoreValue(value));
}
}
上面我們對於是否允許緩存空對象進行了判斷,能夠緩存空對象的好處之一就是可以避免緩存穿透。需要注意的是,Caffeine
中是不能直接緩存null
的,因此可以使用父類提供的toStoreValue()
方法,將它包裝成一個NullValue
類型。在取出對象時,如果是NullValue
,也不用我們自己再去調用fromStoreValue()
將這個包裝類型還原,父類的get
方法中已經幫我們做好了。
另外,上面在put
方法中緩存空對象時,只在Caffeine
緩存中一份即可,可以不用在Redis
中再存一份。
緩存的刪除方法evict()
和清空方法clear()
的實現就比較簡單了,直接刪除一跳或全部數據即可:
@Override
public void evict(Object key) {
redisTemplate.delete(this.cacheName +":"+ key);
caffeineCache.invalidate(key);
}
@Override
public void clear() {
Set<Object> keys = redisTemplate.keys(this.cacheName.concat(":*"));
for (Object key : keys) {
redisTemplate.delete(String.valueOf(key));
}
caffeineCache.invalidateAll();
}
獲取緩存cacheName
和實際緩存的方法實現:
@Override
public String getName() {
return this.cacheName;
}
@Override
public Object getNativeCache() {
return this;
}
最後,我們再來看一下帶有兩個參數的get
方法,為什麼把這個方法放到最後來說呢,因為如果我們只是使用註解來管理緩存的話,那麼這個方法不會被調用到,簡單看一下實現:
@Override
public <T> T get(Object key, Callable<T> valueLoader) {
ReentrantLock lock=new ReentrantLock();
try{
lock.lock();//加鎖
Object obj = lookup(key);
if (Objects.nonNull(obj)){
return (T)obj;
}
//沒有找到
obj = valueLoader.call();
put(key,obj);//放入緩存
return (T)obj;
}catch (Exception e){
log.error(e.getMessage());
}finally {
lock.unlock();
}
return null;
}
方法的實現比較容易理解,還是先調用lookup
方法尋找是否已經緩存了對象,如果沒有找到那麼就調用Callable
中的call
方法進行獲取,並在獲取完成後存入到緩存中去。至於這個方法如何使用,具體代碼我們放在後面使用這一塊再看。
需要注意的是,這個方法的接口注釋中強調了需要我們自己來保證方法同步,因此這裡使用了ReentrantLock
進行了加鎖操作。到這裡,Cache
的實現就完成了,下面我們接着看另一個重要的接口CacheManager
。
CacheManager
從名字就可以看出,CacheManager
是一個緩存管理器,它可以被用來管理一組Cache
。在上一篇文章的v2版本中,我們使用的CaffeineCacheManager
就實現了這個接口,除此之外還有RedisCacheManager
、EhCacheCacheManager
等也都是通過這個接口實現。
下面我們要自定義一個類實現CacheManager
接口,管理上面實現的DoubleCache
作為spring中的緩存使用。接口中需要實現的方法只有下面兩個:
//根據cacheName獲取Cache實例,不存在時進行創建
Cache getCache(String name);
//返回管理的所有cacheName
Collection<String> getCacheNames();
在自定義的緩存管理器中,我們要使用ConcurrentHashMap
維護一組不同的Cache
,再定義一個構造方法,在參數中傳入已經在spring中配置好的RedisTemplate
,以及相關的緩存配置參數:
public class DoubleCacheManager implements CacheManager {
Map<String, Cache> cacheMap = new ConcurrentHashMap<>();
private RedisTemplate<Object, Object> redisTemplate;
private DoubleCacheConfig dcConfig;
public DoubleCacheManager(RedisTemplate<Object, Object> redisTemplate,
DoubleCacheConfig doubleCacheConfig) {
this.redisTemplate = redisTemplate;
this.dcConfig = doubleCacheConfig;
}
//...
}
然後實現getCache
方法,邏輯很簡單,先根據name
從Map
中查找對應的Cache
,如果找到則直接返回,這個參數name
就是上一篇文章中提到的cacheName
,CacheManager
根據它實現不同Cache
的隔離。
如果沒有根據名稱找到緩存的話,那麼新建一個DoubleCache
對象,並放入Map
中。這裡使用的ConcurrentHashMap
的putIfAbsent()
方法放入,避免重複創建Cache
以及造成Cache
內數據的丟失。具體代碼如下:
@Override
public Cache getCache(String name) {
Cache cache = cacheMap.get(name);
if (Objects.nonNull(cache)) {
return cache;
}
cache = new DoubleCache(name, redisTemplate, createCaffeineCache(), dcConfig);
Cache oldCache = cacheMap.putIfAbsent(name, cache);
return oldCache == null ? cache : oldCache;
}
在上面創建DoubleCache
對象的過程中,需要先創建一個Caffeine
的Cache
對象作為參數傳入,這一過程主要是根據實際項目的配置文件中的具體參數進行初始化,代碼如下:
private com.github.benmanes.caffeine.cache.Cache createCaffeineCache(){
Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder();
Optional<DoubleCacheConfig> dcConfigOpt = Optional.ofNullable(this.dcConfig);
dcConfigOpt.map(DoubleCacheConfig::getInit)
.ifPresent(init->caffeineBuilder.initialCapacity(init));
dcConfigOpt.map(DoubleCacheConfig::getMax)
.ifPresent(max->caffeineBuilder.maximumSize(max));
dcConfigOpt.map(DoubleCacheConfig::getExpireAfterWrite)
.ifPresent(eaw->caffeineBuilder.expireAfterWrite(eaw,TimeUnit.SECONDS));
dcConfigOpt.map(DoubleCacheConfig::getExpireAfterAccess)
.ifPresent(eaa->caffeineBuilder.expireAfterAccess(eaa,TimeUnit.SECONDS));
dcConfigOpt.map(DoubleCacheConfig::getRefreshAfterWrite)
.ifPresent(raw->caffeineBuilder.refreshAfterWrite(raw,TimeUnit.SECONDS));
return caffeineBuilder.build();
}
getCacheNames
方法很簡單,直接返回Map
的keySet
就可以了,代碼如下:
@Override
public Collection<String> getCacheNames() {
return cacheMap.keySet();
}
配置&使用
在application.yml
文件中配置緩存的參數,代碼中使用@ConfigurationProperties
接收到DoubleCacheConfig
類中:
doublecache:
allowNull: true
init: 128
max: 1024
expireAfterWrite: 30 #Caffeine過期時間
redisExpire: 60 #Redis緩存過期時間
配置自定義的DoubleCacheManager
作為默認的緩存管理器:
@Configuration
public class CacheConfig {
@Autowired
DoubleCacheConfig doubleCacheConfig;
@Bean
public DoubleCacheManager cacheManager(RedisTemplate<Object,Object> redisTemplate,
DoubleCacheConfig doubleCacheConfig){
return new DoubleCacheManager(redisTemplate,doubleCacheConfig);
}
}
Service
中的代碼還是老樣子,不需要在代碼中手動操作緩存,只要直接在方法上使用@Cache
相關註解即可:
@Service @Slf4j
@AllArgsConstructor
public class OrderServiceImpl implements OrderService {
private final OrderMapper orderMapper;
@Cacheable(value = "order",key = "#id")
public Order getOrderById(Long id) {
Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
.eq(Order::getId, id));
return myOrder;
}
@CachePut(cacheNames = "order",key = "#order.id")
public Order updateOrder(Order order) {
orderMapper.updateById(order);
return order;
}
@CacheEvict(cacheNames = "order",key = "#id")
public void deleteOrder(Long id) {
orderMapper.deleteById(id);
}
//沒有註解,使用get(key,callable)方法
public Order getOrderById2(Long id) {
DoubleCacheManager cacheManager = SpringContextUtil.getBean(DoubleCacheManager.class);
Cache cache = cacheManager.getCache("order");
Order order =(Order) cache.get(id, (Callable<Object>) () -> {
log.info("get data from database");
Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
.eq(Order::getId, id));
return myOrder;
});
return order;
}
}
注意最後這個沒有添加任何註解的方法,只有以這種方式調用時才會執行我們在DoubleCache
中自己實現的get(key,callable)
方法。到這裡,基於JSR107
規範和spring接口的兩級緩存改造就完成了,下面我們看一下遺漏的第二個問題。
分佈式環境改造
前面我們說了,在分佈式環境下,可能會存在各個主機上一級緩存不一致的問題。當一台主機修改了本地緩存後,其他主機是沒有感知的,仍然保持了之前的緩存,那麼這種情況下就可能取到臟數據。既然我們在項目中已經使用了Redis
,那麼就可以使用它的發佈/訂閱功能來使各個節點的緩存進行同步。
定義消息體
在使用Redis
發送消息前,需要先定義一個消息對象。其中的數據包括消息要作用於的Cache
名稱、操作類型、數據以及發出消息的源主機標識:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CacheMassage implements Serializable {
private static final long serialVersionUID = -3574997636829868400L;
private String cacheName;
private CacheMsgType type; //標識更新或刪除操作
private Object key;
private Object value;
private String msgSource; //源主機標識,用來避免重複操作
}
定義一個枚舉來標識消息的類型,是要進行更新還是刪除操作:
public enum CacheMsgType {
UPDATE,
DELETE;
}
消息體中的msgSource
是添加的一個消息源主機的標識,添加這個是為了避免收到當前主機發送的消息後,再進行重複操作,也就是說收到本機發出的消息直接丟掉什麼都不做就可以了。源主機標識這裡使用的是主機ip加項目端口的方式,獲取方法如下:
public static String getMsgSource() throws UnknownHostException {
String host = InetAddress.getLocalHost().getHostAddress();
Environment env = SpringContextUtil.getBean(Environment.class);
String port = env.getProperty("server.port");
return host+":"+port;
}
這樣消息體的定義就完成了,之後只要調用redisTemplate
的convertAndSend
方法就可以把這個對象發佈到指定的主題上了。
Redis消息配置
要使用Redis
的消息監聽功能,需要配置兩項內容:
MessageListenerAdapter
:消息監聽適配器,可以在其中指定自定義的監聽代理類,並且可以自定義使用哪個方法處理監聽邏輯RedisMessageListenerContainer
: 一個可以為消息監聽器提供異步行為的容器,並且提供消息轉換和分派等底層功能
@Configuration
public class MessageConfig {
public static final String TOPIC="cache.msg";
@Bean
RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter,
RedisConnectionFactory redisConnectionFactory){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(TOPIC));
return container;
}
@Bean
MessageListenerAdapter adapter(RedisMessageReceiver receiver){
return new MessageListenerAdapter(receiver,"receive");
}
}
在上面的監聽適配器MessageListenerAdapter
中,我們傳入了一個自定義的RedisMessageReceiver
接收並處理消息,並指定使用它的receive
方法來處理監聽到的消息,下面我們就來看看它如何接收消息並消費。
消息消費邏輯
定義一個類RedisMessageReceiver
來接收並消費消息,需要在它的方法中實現以下功能:
- 反序列化接收到的消息,轉換為前面定義的
CacheMassage
類型對象 - 根據消息的主機標識判斷這條消息是不是本機發出的,如果是那麼直接丟棄,只有接收到其他主機發出的消息才進行處理
- 使用
cacheName
得到具體使用的那一個DoubleCache
實例 - 根據消息的類型判斷要執行的是更新還是刪除操作,調用對應的方法
@Slf4j @Component
@AllArgsConstructor
public class RedisMessageReceiver {
private final RedisTemplate redisTemplate;
private final DoubleCacheManager manager;
//接收通知,進行處理
public void receive(String message) throws UnknownHostException {
CacheMassage msg = (CacheMassage) redisTemplate
.getValueSerializer().deserialize(message.getBytes());
log.info(msg.toString());
//如果是本機發出的消息,那麼不進行處理
if (msg.getMsgSource().equals(MessageSourceUtil.getMsgSource())){
log.info("收到本機發出的消息,不做處理");
return;
}
DoubleCache cache = (DoubleCache) manager.getCache(msg.getCacheName());
if (msg.getType()== CacheMsgType.UPDATE) {
cache.updateL1Cache(msg.getKey(),msg.getValue());
log.info("更新本地緩存");
}
if (msg.getType()== CacheMsgType.DELETE) {
log.info("刪除本地緩存");
cache.evictL1Cache(msg.getKey());
}
}
}
在上面的代碼中,調用了DoubleCache
中更新一級緩存方法updateL1Cache
、刪除一級緩存方法evictL1Cache
,我們會後面在DoubleCache
中進行添加。
修改DoubleCache
在DoubleCache
中先添加上面提到的兩個方法,由CacheManager
獲取到具體緩存後調用,進行一級緩存的更新或刪除操作:
// 更新一級緩存
public void updateL1Cache(Object key,Object value){
caffeineCache.put(key,value);
}
// 刪除一級緩存
public void evictL1Cache(Object key){
caffeineCache.invalidate(key);
}
好了,完事具備只欠東風,我們要在什麼場合發送消息呢?答案是在DoubleCache
中存入緩存的put
方法和移除緩存的evict
方法中。首先修改put
方法,方法中前面的邏輯不變,在最後添加發送消息通知其他節點更新一級緩存的邏輯:
public void put(Object key, Object value) {
// 省略前面的不變代碼...
//發送信息通知其他節點更新一級緩存
CacheMassage cacheMassage
= new CacheMassage(this.cacheName, CacheMsgType.UPDATE,
key,value, MessageSourceUtil.getMsgSource());
redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage);
}
然後修改evict
方法,同樣保持前面的邏輯不變,在最後添加發送消息的代碼:
public void evict(Object key) {
// 省略前面的不變代碼...
//發送信息通知其他節點刪除一級緩存
CacheMassage cacheMassage
= new CacheMassage(this.cacheName, CacheMsgType.DELETE,
key,null, MessageSourceUtil.getMsgSource());
redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage);
}
適配分佈式環境的改造工作到此結束,下面進行一下簡單的測試工作。
測試
我們可以用idea
的Allow parallel run
功能同時啟動兩個一樣的springboot項目,來模擬分佈式環境下的兩台主機,注意在啟動參數中添加-Dserver.port
參數來啟動到不同端口。
首先測試更新操作,使用接口修改某一個主機的本地緩存,可以看到發出消息的主機在收到消息後,直接丟棄不做任何處理:
查看另一台主機的日誌,收到消息並更新了本地緩存:
再看一下緩存的刪除情況,同樣本地刪除後再收到消息不做處理:
看另一台主機收到消息後,會刪除本地的一級緩存:
可以看到,分佈式環境下本地緩存通過Redis
消息的發佈訂閱機制保證了一級緩存的一致性。
另外,如果更加嚴謹一些的話,其實還應該處理一下緩存更新失敗的情況,這裡留個坑以後再填。簡單說一下思路,我們應該在代碼中捕獲緩存更新失敗的異常,然後刪除二級緩存、本機以及其他主機的一級緩存,再等待下一次訪問時直接拉取最新的數據進行緩存。同樣,要想實現緩存失效同時作用於所有單機節點的本地緩存這一功能,也可以使用上面的發佈訂閱來實現。
總結
好了,這次縫縫補補的填坑之旅到這裡就要結束了。可以看到使用基於JSR107
規範的spring接口進行修改後,代碼看起來舒服了很多,並且支持直接使用spring的@Cache
相關註解。如果想在項目中使用的話,自己封裝一個簡單的starter
就可以了,使用起來也非常簡單。
那麼,這次的分享就到這裡,我是Hydra,下篇文章再見。
本文及上一篇文章的示例代碼已合併上傳到了Hydra的Github上,公眾號【碼農參上】後台回復緩存獲取鏈接,本文代碼在項目的v4 module中,歡迎小夥伴們來給個star啊~
作者簡介,碼農參上,一個熱愛分享的公眾號,有趣、深入、直接,與你聊聊技術。個人微信DrHydra9,歡迎添加好友,進一步交流。