Redis實現快取與分散式鎖
- 2020 年 10 月 22 日
- 筆記
快取與分散式鎖
哪些數據適合放入快取
- 即時性、數據一致性要求不高的
- 訪問量大且更新頻率不高的數據
選擇redis做為快取中間件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
問題記錄與分析
產生堆外記憶體溢出:OutOfDirectMemoryError
- springboot2.0 以後默認使用lettuce作為操作redis的客戶端,它使用netty進行網路通訊。
- lettuce的bug導致netty堆外記憶體溢出
解決方案: 切換到jedis(或者升級lettuce)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
高並發下快取失效問題-快取穿透
快取穿透:
指查詢一個一定不存在的數據,由於快取是不命中,將要去查詢資料庫,但是資料庫也沒有該記錄,我們將這次查詢的null寫入快取,這將導致這個不存在的數據每次請求都要到存儲層去查詢,失去了快取的意義
風險:
利用不存在數據進行攻擊,資料庫瞬時壓力增大,最終導致崩潰
解決:
null結果快取,並加入短暫過期時間
高並發下快取失效問題-快取雪崩
快取雪崩:
快取雪崩是指我們設置快取時key採用了相同的過期時間,導致快取在某一時刻同時失效,請求全部轉發到DB,DB瞬時壓力過重雪崩。
解決:
原有的失效時間基礎上增加一個隨機值,比如1-5min隨機,這樣每一個快取的過期時間的重複率就會降低,就很難引發集體失效的事件。
高並發下快取失效問題-快取擊穿
快取擊穿:
- 對於一些設置了過期時間的key,如果這些key可能會在某些時間點被超高並發地訪問,是一種非常」熱點「的數據。
- 如果這個key在大量請求同時進來前正好失效,那麼所有key的數據查詢都落到db,我們稱之為快取擊穿。
解決:
加鎖,大量並發只讓一個請求去查,其他請求等待,查到以後釋放鎖,其他請求獲取到鎖,先查快取,就會有數據,不用去db。
加鎖實踐:
springboot所有的組件在容器中都是單例的,可以使用synchronized(this),JUC(Lock)等解決單體應用中的問題,但是分散式系統中,要想鎖住所有數據,就必須使用分散式鎖
通過分析 分散式鎖必須保證加鎖(佔位+過期時間)和刪除鎖(判斷+刪除)的原子性。
加鎖可以使用redis setnx ex命令來操作,但是刪除鎖的時候 ,要先判斷再刪除,想把這兩步操作做成原則性的,需要採用redis+lusj腳本的方式來操作。
public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
// 1. 佔分散式鎖
String uuid = UUID.randomUUID().toString();
// 設置鎖和設置過期時間必須是原子性的 不能通過redis的兩條命令設置,這裡的命令等價於redis命令setnx ex
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
log.info("獲取分散式鎖成功 ...");
Map<String, List<Catalog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
// 刪除鎖 必須判斷是當前鎖 再刪除,所以,為了保證原子性操作 需要採取redis+Lua腳本 完成
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 執行腳本
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList("lock"), uuid);
}
return dataFromDb;
} else {
// 加鎖失敗
log.info("加鎖失敗,獲取分散式鎖 等待重試");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}
}
Redisson
配置單個redis
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
// 1. 創建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 2. 根據配置創建出Redisson實例
return Redisson.create(config);
}
}
測試:
@ResponseBody
@GetMapping("/hello")
public String hello() {
// 1. 獲取一把鎖 ,只要鎖的名字一樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
// 2. 加鎖
lock.lock(); // 阻塞式等待
// 鎖的自動續期:如果業務超長,運行期間自動給鎖續上新的30s 不用擔心業務時間長,鎖自動過期被刪掉
// 加鎖的業務只要運行完成,就不會給當前鎖續期,即使不手動解鎖,鎖默認在30s以後自動刪除
try {
System.out.println("加鎖成功 執行業務 ..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 3. 解鎖
System.out.println("釋放鎖 ..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
lock.lock()
- 如果我們傳遞了時間,就發送給redis執行腳本,進行占鎖,默認超時就是我們指定的時間
- 如果我們未指定鎖的超時時間,就使用30*1000 (看門狗的默認時間:LockWatchdogTimeout),只要佔鎖成功,就會啟動一個定時任務:重新給鎖設置過期時間,新的過期時間就是看門狗的默認時間。這個定時任務執行間隔(internalLockLeaseTime)為: (看門狗時間/3)
源程式碼:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
傳遞時間時執行的方法:執行lua腳本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
定時任務來做續期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
最佳實踐是傳時間,省去了整個續期的操作,給定合理的過期時間即可。
讀寫鎖測試:
/**
* 測試讀寫鎖 - 寫
*/
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.writeLock();
String s = "";
try {
rLock.lock();
s = UUID.randomUUID().toString();
// 模擬業務時長
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
/**
* 測試讀寫鎖 - 讀
* <p>
* 保證一定能讀到最新的數據,修改期間,寫鎖是一個排他鎖(互斥鎖),讀鎖是一個共享鎖
* 寫鎖沒釋放 讀就必須等待
*/
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.readLock();
rLock.lock();
String writeValue = "";
try {
writeValue = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return writeValue;
}
總結:
- 讀+讀 :相當於無鎖,並發讀,只會在redis中記錄好,所有當前的讀鎖,他們都會同時加鎖成功
- 寫+讀 :等待寫鎖釋放
- 寫+寫 :阻塞方式
- 讀+寫 :有讀鎖,寫也需要等待
即只要有寫的存在,都必須等待
訊號量測試:
/**
* 測試訊號量
* <p>
* 模擬車庫停車
* 車位 3 測試的時候先在redis中先設置當前車位數 set park 3
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("park");
// acquire()是阻塞的,當沒有車位時會一直等到有釋放時才返回
// 如果不想阻塞 可以使用 tryAcquire() 會返回一個布爾值
semaphore.acquire(); // 獲取一個訊號,獲取一個值,即佔一個車位
return "ok";
}
/**
* 出庫
*/
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("park");
semaphore.release(); // 釋放一個訊號,即空出一個車位
return "ok";
}
閉鎖測試:
/**
* 測試閉鎖
* <p>
* 模擬學校關閉大門 只要當5個班級人都走完了 才可以關閉大門
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待閉鎖都完成
return "放假嘍!關大門了哈";
}
@GetMapping("/outDoor/{id}")
@ResponseBody
public String outDoor(@PathVariable("id") Long id) {
// 這裡只是模擬 不用考慮真實場景
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); // 計數減一
return id + "班的人都走完了";
}
快取數據一致性
- 雙寫模式
- 失效模式
無論是雙寫模式還是失效模式,都會導致快取的不一致問題。即多個實例同時更新會出事。怎麼辦?
- 如果是用戶緯度數據(訂單數據、用戶數據),這種並發幾率非常小,不用考慮這個問題,快取數據加
上過期時間,每隔一段時間觸發讀的主動更新即可
-
如果是菜單,商品介紹等基礎數據,也可以去使用canal訂閱binlog的方式。
-
快取數據+過期時間也足夠解決大部分業務對於快取的要求。
-
通過加鎖保證並發讀寫,寫寫的時候按順序排好隊。讀讀無所謂。所以適合使用讀寫鎖。(業務不關心
臟數據,允許臨時臟數據可忽略);
總結:
- 我們能放入快取的數據本就不應該是實時性、一致性要求超高的。所以快取數據的時候加上過期時間,保
證每天拿到當前最新數據即可。
-
我們不應該過度設計,增加系統的複雜性
-
遇到實時性、一致性要求高的數據,就應該查資料庫,即使慢點。
整合springcache
整合springcache,簡化快取開發
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
指定使用redis來快取
spring.cache.type=redis
spring提供的幾個註解:
@Cacheable
: Triggers cache population:觸發將數據保存到快取的操作@CacheEvict
: Triggers cache eviction:觸發將數據從快取中刪除的操作@CachePut
: Updates the cache without interfering with the method execution:不影響方法執行更新快取@Caching
: Regroups multiple cache operations to be applied on a method:組合以上多個操作@CacheConfig
: Shares some common cache-related settings at class-level:在類級別共享快取的相同配置
測試:
- 開啟快取功能:啟動類上加上 @EnableCaching註解
- 只需要使用註解就能完成快取
使用快取後的默認行為:
- 如果快取中有數據,則方法不會調用,即直接返回快取中的數據
- key默認自動生成:快取的名字::SimpleKey []
- 快取的value值,默認使用jdk序列化機制,將序列化後的數據存到redis
- 默認ttl時間:-1,即用不過期
以上默認行為導致的結果與我們實際需求不同,所有我們可以自定義這些配置:
自定義:
- 指定生成的key : 通過key屬性指定,接收一個spEL表達式
- 指定快取數據的過期時間:配置文件中配置
- 將保存的value數據轉為json格式
原理:CacheAutoConfiguration
-> RedisCacheConfiguration
-> 自動配置了RedisCacheManager
-> 初始化所有的快取 -> 每個快取決定使用什麼配置 -> 如果redisCacheConfiguration
有就用已有的,沒有就使用默認配置
所以,想要改快取的配置,只需要給容器中注入一個 RedisCacheConfiguration
即可
就會應用到當前RedisCacheManager
管理的所有快取分區中;
可以參考源碼中默認配置來自己寫一個RedisCacheConfiguration:
源碼中的默認配置:
* <dd>{@link org.springframework.data.redis.serializer.StringRedisSerializer}</dd>
* <dt>value serializer</dt>
* <dd>{@link org.springframework.data.redis.serializer.JdkSerializationRedisSerializer}</dd>
從這注釋中可以看出 k採用的是字元串序列化,v採用的是jdk序列化
自定義配置:
@EnableCaching
@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class MyRedisCacheConfig {
@Bean
RedisCacheConfiguration configuration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
redis快取的其他配置
spring.cache.type=redis
# 單位為毫秒
spring.cache.redis.time-to-live=3600000
# 快取的key 加前綴, 可以用來區分redis中的值哪些是快取用的數據
# 如果這裡配置了前綴 就代替了默認前綴 之前的默認前綴 快取名字::
spring.cache.redis.key-prefix=CHCHE_
# 是否使用配置的前綴
spring.cache.redis.use-key-prefix=true
# 是否快取空值,開啟 防止快取穿透
spring.cache.redis.cache-null-values=true
快取數據測試:
@Cacheable(value = {"category"}, key = "#root.method.name")
@Override
public List<CategoryEntity> getLevelOne() { ... }
刪除快取測試:
// 分類數據更新的時候 觸發刪除快取 指定快取分區 再指定key 注意這裡的key 接收的是一個spEL表達式,如果是普通字元串 需要裡面加單引號
@CacheEvict(value = "category", key = "'getLevelOne'")
如果一個要刪除多個快取,就可以使用@Caching,它可以組合其他註解
@Caching(evict = {
@CacheEvict(value = "category", key = "'getLevelOne'"),
@CacheEvict(value = "category", key = "'getCatalogJson'")
})
或者可以指定刪除某個快取分區下的所有快取,這也是使用快取分區的好處
@CacheEvict(value = "category", allEntries = true)
所以我們約定,存儲同一類型的數據 使用同一個快取分區
且為了方便管理 配置文件中,不知道自定義前綴,就使用默認的 分區名為前綴
Spring-Cache 的不足:
使用@Cacheable時可以指定sync = true解決快取擊穿問題,但是不是分散式鎖。