Redis實現快取與分散式鎖

  • 2020 年 10 月 22 日
  • 筆記

快取與分散式鎖

哪些數據適合放入快取

  • 即時性、數據一致性要求不高的
  • 訪問量大且更新頻率不高的數據

選擇redis做為快取中間件

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

問題記錄與分析

產生堆外記憶體溢出:OutOfDirectMemoryError

  1. springboot2.0 以後默認使用lettuce作為操作redis的客戶端,它使用netty進行網路通訊。
  2. lettuce的bug導致netty堆外記憶體溢出

解決方案: 切換到jedis(或者升級lettuce)

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
  <exclusions>
    <exclusion>
      <groupId>io.lettuce</groupId>
      <artifactId>lettuce-core</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
</dependency>

高並發下快取失效問題-快取穿透

快取穿透:

指查詢一個一定不存在的數據,由於快取是不命中,將要去查詢資料庫,但是資料庫也沒有該記錄,我們將這次查詢的null寫入快取,這將導致這個不存在的數據每次請求都要到存儲層去查詢,失去了快取的意義

風險:

利用不存在數據進行攻擊,資料庫瞬時壓力增大,最終導致崩潰

解決:

null結果快取,並加入短暫過期時間

高並發下快取失效問題-快取雪崩

快取雪崩:

快取雪崩是指我們設置快取時key採用了相同的過期時間,導致快取在某一時刻同時失效,請求全部轉發到DB,DB瞬時壓力過重雪崩。

解決:

原有的失效時間基礎上增加一個隨機值,比如1-5min隨機,這樣每一個快取的過期時間的重複率就會降低,就很難引發集體失效的事件。

高並發下快取失效問題-快取擊穿

快取擊穿:

  • 對於一些設置了過期時間的key,如果這些key可能會在某些時間點被超高並發地訪問,是一種非常」熱點「的數據。
  • 如果這個key在大量請求同時進來前正好失效,那麼所有key的數據查詢都落到db,我們稱之為快取擊穿。

解決:

加鎖,大量並發只讓一個請求去查,其他請求等待,查到以後釋放鎖,其他請求獲取到鎖,先查快取,就會有數據,不用去db。

加鎖實踐:

springboot所有的組件在容器中都是單例的,可以使用synchronized(this),JUC(Lock)等解決單體應用中的問題,但是分散式系統中,要想鎖住所有數據,就必須使用分散式鎖

通過分析 分散式鎖必須保證加鎖(佔位+過期時間)和刪除鎖(判斷+刪除)的原子性。

加鎖可以使用redis setnx ex命令來操作,但是刪除鎖的時候 ,要先判斷再刪除,想把這兩步操作做成原則性的,需要採用redis+lusj腳本的方式來操作。

public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
        // 1. 佔分散式鎖
        String uuid = UUID.randomUUID().toString();
        // 設置鎖和設置過期時間必須是原子性的 不能通過redis的兩條命令設置,這裡的命令等價於redis命令setnx ex
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            log.info("獲取分散式鎖成功 ...");
            Map<String, List<Catalog2Vo>> dataFromDb;
            try {
                dataFromDb = getDataFromDb();
            } finally {
                // 刪除鎖 必須判斷是當前鎖 再刪除,所以,為了保證原子性操作 需要採取redis+Lua腳本 完成
                String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                // 執行腳本
                redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
                        Collections.singletonList("lock"), uuid);
            }
            return dataFromDb;
        } else {
            // 加鎖失敗
            log.info("加鎖失敗,獲取分散式鎖 等待重試");

            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonWithRedisLock();
        }
    }

Redisson

配置單個redis

@Configuration
public class MyRedissonConfig {
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        // 1. 創建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        // 2. 根據配置創建出Redisson實例
        return Redisson.create(config);
    }
}

測試:

@ResponseBody
    @GetMapping("/hello")
    public String hello() {
        // 1. 獲取一把鎖 ,只要鎖的名字一樣,就是同一把鎖
        RLock lock = redisson.getLock("my-lock");

        // 2. 加鎖
        lock.lock(); // 阻塞式等待
        // 鎖的自動續期:如果業務超長,運行期間自動給鎖續上新的30s 不用擔心業務時間長,鎖自動過期被刪掉
        // 加鎖的業務只要運行完成,就不會給當前鎖續期,即使不手動解鎖,鎖默認在30s以後自動刪除
        try {
            System.out.println("加鎖成功 執行業務 ..." + Thread.currentThread().getId());
            Thread.sleep(30000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 3. 解鎖
            System.out.println("釋放鎖 ..." + Thread.currentThread().getId());
            lock.unlock();
        }
        return "hello";
    }

lock.lock()

  • 如果我們傳遞了時間,就發送給redis執行腳本,進行占鎖,默認超時就是我們指定的時間
  • 如果我們未指定鎖的超時時間,就使用30*1000 (看門狗的默認時間:LockWatchdogTimeout),只要佔鎖成功,就會啟動一個定時任務:重新給鎖設置過期時間,新的過期時間就是看門狗的默認時間。這個定時任務執行間隔(internalLockLeaseTime)為: (看門狗時間/3)

源程式碼:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
  if (leaseTime != -1) {
    return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  }
  RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {
      return;
    }

    // lock acquired
    if (ttlRemaining == null) {
      scheduleExpirationRenewal(threadId);
    }
  });
  return ttlRemainingFuture;
}

傳遞時間時執行的方法:執行lua腳本

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

定時任務來做續期

private void renewExpiration() {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
    return;
  }

  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
      if (ent == null) {
        return;
      }
      Long threadId = ent.getFirstThreadId();
      if (threadId == null) {
        return;
      }

      RFuture<Boolean> future = renewExpirationAsync(threadId);
      future.onComplete((res, e) -> {
        if (e != null) {
          log.error("Can't update lock " + getName() + " expiration", e);
          return;
        }

        if (res) {
          // reschedule itself
          renewExpiration();
        }
      });
    }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

  ee.setTimeout(task);
}

最佳實踐是傳時間,省去了整個續期的操作,給定合理的過期時間即可。

讀寫鎖測試:

/**
 * 測試讀寫鎖 - 寫
 */
@GetMapping("/write")
@ResponseBody
public String writeValue() {
  RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
  RLock rLock = lock.writeLock();
  String s = "";
  try {
    rLock.lock();
    s = UUID.randomUUID().toString();
    // 模擬業務時長
    Thread.sleep(30000);
    redisTemplate.opsForValue().set("writeValue", s);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    rLock.unlock();
  }
  return s;
}

/**
  * 測試讀寫鎖 - 讀
  * <p>
  * 保證一定能讀到最新的數據,修改期間,寫鎖是一個排他鎖(互斥鎖),讀鎖是一個共享鎖
  * 寫鎖沒釋放 讀就必須等待
  */
@GetMapping("/read")
@ResponseBody
public String readValue() {
  RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
  RLock rLock = lock.readLock();
  rLock.lock();
  String writeValue = "";
  try {
    writeValue = redisTemplate.opsForValue().get("writeValue");
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    rLock.unlock();
  }
  return writeValue;
}

總結:

  • 讀+讀 :相當於無鎖,並發讀,只會在redis中記錄好,所有當前的讀鎖,他們都會同時加鎖成功
  • 寫+讀 :等待寫鎖釋放
  • 寫+寫 :阻塞方式
  • 讀+寫 :有讀鎖,寫也需要等待

即只要有寫的存在,都必須等待

訊號量測試:

/**
  * 測試訊號量
  * <p>
  * 模擬車庫停車
  * 車位 3  測試的時候先在redis中先設置當前車位數  set park 3
  */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
  RSemaphore semaphore = redisson.getSemaphore("park");
  // acquire()是阻塞的,當沒有車位時會一直等到有釋放時才返回
  // 如果不想阻塞 可以使用 tryAcquire() 會返回一個布爾值
  semaphore.acquire(); // 獲取一個訊號,獲取一個值,即佔一個車位
  return "ok";
}

/**
  * 出庫
  */
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
  RSemaphore semaphore = redisson.getSemaphore("park");
  semaphore.release(); // 釋放一個訊號,即空出一個車位
  return "ok";
}

閉鎖測試:

/**
  * 測試閉鎖
  * <p>
  * 模擬學校關閉大門 只要當5個班級人都走完了 才可以關閉大門
  */
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
  RCountDownLatch door = redisson.getCountDownLatch("door");
  door.trySetCount(5);
  door.await(); //等待閉鎖都完成
  return "放假嘍!關大門了哈";
}

@GetMapping("/outDoor/{id}")
@ResponseBody
public String outDoor(@PathVariable("id") Long id) {
  // 這裡只是模擬 不用考慮真實場景
  RCountDownLatch door = redisson.getCountDownLatch("door");
  door.countDown(); // 計數減一
  return id + "班的人都走完了";
}

快取數據一致性

  • 雙寫模式
  • 失效模式

無論是雙寫模式還是失效模式,都會導致快取的不一致問題。即多個實例同時更新會出事。怎麼辦?

  1. 如果是用戶緯度數據(訂單數據、用戶數據),這種並發幾率非常小,不用考慮這個問題,快取數據加

上過期時間,每隔一段時間觸發讀的主動更新即可

  1. 如果是菜單,商品介紹等基礎數據,也可以去使用canal訂閱binlog的方式。

  2. 快取數據+過期時間也足夠解決大部分業務對於快取的要求。

  3. 通過加鎖保證並發讀寫,寫寫的時候按順序排好隊。讀讀無所謂。所以適合使用讀寫鎖。(業務不關心

臟數據,允許臨時臟數據可忽略);

總結:

  • 我們能放入快取的數據本就不應該是實時性、一致性要求超高的。所以快取數據的時候加上過期時間,保

證每天拿到當前最新數據即可。

  • 我們不應該過度設計,增加系統的複雜性

  • 遇到實時性、一致性要求高的數據,就應該查資料庫,即使慢點。

整合springcache

整合springcache,簡化快取開發

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

指定使用redis來快取

spring.cache.type=redis

spring提供的幾個註解:

  • @Cacheable: Triggers cache population:觸發將數據保存到快取的操作
  • @CacheEvict: Triggers cache eviction:觸發將數據從快取中刪除的操作
  • @CachePut: Updates the cache without interfering with the method execution:不影響方法執行更新快取
  • @Caching: Regroups multiple cache operations to be applied on a method:組合以上多個操作
  • @CacheConfig: Shares some common cache-related settings at class-level:在類級別共享快取的相同配置

測試:

  1. 開啟快取功能:啟動類上加上 @EnableCaching註解
  2. 只需要使用註解就能完成快取

使用快取後的默認行為:

  • 如果快取中有數據,則方法不會調用,即直接返回快取中的數據
  • key默認自動生成:快取的名字::SimpleKey []
  • 快取的value值,默認使用jdk序列化機制,將序列化後的數據存到redis
  • 默認ttl時間:-1,即用不過期

以上默認行為導致的結果與我們實際需求不同,所有我們可以自定義這些配置:

自定義:

  • 指定生成的key : 通過key屬性指定,接收一個spEL表達式
  • 指定快取數據的過期時間:配置文件中配置
  • 將保存的value數據轉為json格式

原理:CacheAutoConfiguration -> RedisCacheConfiguration -> 自動配置了RedisCacheManager -> 初始化所有的快取 -> 每個快取決定使用什麼配置 -> 如果redisCacheConfiguration有就用已有的,沒有就使用默認配置

所以,想要改快取的配置,只需要給容器中注入一個 RedisCacheConfiguration即可

就會應用到當前RedisCacheManager管理的所有快取分區中;

可以參考源碼中默認配置來自己寫一個RedisCacheConfiguration:

源碼中的默認配置:

 * <dd>{@link org.springframework.data.redis.serializer.StringRedisSerializer}</dd>
 * <dt>value serializer</dt>
 * <dd>{@link org.springframework.data.redis.serializer.JdkSerializationRedisSerializer}</dd>

從這注釋中可以看出 k採用的是字元串序列化,v採用的是jdk序列化

自定義配置:

@EnableCaching
@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class MyRedisCacheConfig {
    @Bean
    RedisCacheConfiguration configuration(CacheProperties cacheProperties) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }
        return config;
    }
}

redis快取的其他配置

spring.cache.type=redis
# 單位為毫秒
spring.cache.redis.time-to-live=3600000
# 快取的key 加前綴, 可以用來區分redis中的值哪些是快取用的數據
# 如果這裡配置了前綴 就代替了默認前綴 之前的默認前綴 快取名字::
spring.cache.redis.key-prefix=CHCHE_
# 是否使用配置的前綴
spring.cache.redis.use-key-prefix=true
# 是否快取空值,開啟 防止快取穿透
spring.cache.redis.cache-null-values=true

快取數據測試:

@Cacheable(value = {"category"}, key = "#root.method.name")
@Override
public List<CategoryEntity> getLevelOne() { ... }

刪除快取測試:

// 分類數據更新的時候 觸發刪除快取 指定快取分區 再指定key  注意這裡的key 接收的是一個spEL表達式,如果是普通字元串 需要裡面加單引號
@CacheEvict(value = "category", key = "'getLevelOne'")

如果一個要刪除多個快取,就可以使用@Caching,它可以組合其他註解

@Caching(evict = {
  @CacheEvict(value = "category", key = "'getLevelOne'"),
  @CacheEvict(value = "category", key = "'getCatalogJson'")
})

或者可以指定刪除某個快取分區下的所有快取,這也是使用快取分區的好處

@CacheEvict(value = "category", allEntries = true)

所以我們約定,存儲同一類型的數據 使用同一個快取分區

且為了方便管理 配置文件中,不知道自定義前綴,就使用默認的 分區名為前綴

Spring-Cache 的不足:

使用@Cacheable時可以指定sync = true解決快取擊穿問題,但是不是分散式鎖。