Redis实现缓存与分布式锁

  • 2020 年 10 月 22 日
  • 筆記

缓存与分布式锁

哪些数据适合放入缓存

  • 即时性、数据一致性要求不高的
  • 访问量大且更新频率不高的数据

选择redis做为缓存中间件

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

问题记录与分析

产生堆外内存溢出:OutOfDirectMemoryError

  1. springboot2.0 以后默认使用lettuce作为操作redis的客户端,它使用netty进行网络通信。
  2. lettuce的bug导致netty堆外内存溢出

解决方案: 切换到jedis(或者升级lettuce)

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
  <exclusions>
    <exclusion>
      <groupId>io.lettuce</groupId>
      <artifactId>lettuce-core</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
</dependency>

高并发下缓存失效问题-缓存穿透

缓存穿透:

指查询一个一定不存在的数据,由于缓存是不命中,将要去查询数据库,但是数据库也没有该记录,我们将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

风险:

利用不存在数据进行攻击,数据库瞬时压力增大,最终导致崩溃

解决:

null结果缓存,并加入短暂过期时间

高并发下缓存失效问题-缓存雪崩

缓存雪崩:

缓存雪崩是指我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决:

原有的失效时间基础上增加一个随机值,比如1-5min随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

高并发下缓存失效问题-缓存击穿

缓存击穿:

  • 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常”热点“的数据。
  • 如果这个key在大量请求同时进来前正好失效,那么所有key的数据查询都落到db,我们称之为缓存击穿。

解决:

加锁,大量并发只让一个请求去查,其他请求等待,查到以后释放锁,其他请求获取到锁,先查缓存,就会有数据,不用去db。

加锁实践:

springboot所有的组件在容器中都是单例的,可以使用synchronized(this),JUC(Lock)等解决单体应用中的问题,但是分布式系统中,要想锁住所有数据,就必须使用分布式锁

通过分析 分布式锁必须保证加锁(占位+过期时间)和删除锁(判断+删除)的原子性。

加锁可以使用redis setnx ex命令来操作,但是删除锁的时候 ,要先判断再删除,想把这两步操作做成原则性的,需要采用redis+lusj脚本的方式来操作。

public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
        // 1. 占分布式锁
        String uuid = UUID.randomUUID().toString();
        // 设置锁和设置过期时间必须是原子性的 不能通过redis的两条命令设置,这里的命令等价于redis命令setnx ex
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            log.info("获取分布式锁成功 ...");
            Map<String, List<Catalog2Vo>> dataFromDb;
            try {
                dataFromDb = getDataFromDb();
            } finally {
                // 删除锁 必须判断是当前锁 再删除,所以,为了保证原子性操作 需要采取redis+Lua脚本 完成
                String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                // 执行脚本
                redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
                        Collections.singletonList("lock"), uuid);
            }
            return dataFromDb;
        } else {
            // 加锁失败
            log.info("加锁失败,获取分布式锁 等待重试");

            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonWithRedisLock();
        }
    }

Redisson

配置单个redis

@Configuration
public class MyRedissonConfig {
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        // 1. 创建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        // 2. 根据配置创建出Redisson实例
        return Redisson.create(config);
    }
}

测试:

@ResponseBody
    @GetMapping("/hello")
    public String hello() {
        // 1. 获取一把锁 ,只要锁的名字一样,就是同一把锁
        RLock lock = redisson.getLock("my-lock");

        // 2. 加锁
        lock.lock(); // 阻塞式等待
        // 锁的自动续期:如果业务超长,运行期间自动给锁续上新的30s 不用担心业务时间长,锁自动过期被删掉
        // 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
        try {
            System.out.println("加锁成功 执行业务 ..." + Thread.currentThread().getId());
            Thread.sleep(30000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 3. 解锁
            System.out.println("释放锁 ..." + Thread.currentThread().getId());
            lock.unlock();
        }
        return "hello";
    }

lock.lock()

  • 如果我们传递了时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  • 如果我们未指定锁的超时时间,就使用30*1000 (看门狗的默认时间:LockWatchdogTimeout),只要占锁成功,就会启动一个定时任务:重新给锁设置过期时间,新的过期时间就是看门狗的默认时间。这个定时任务执行间隔(internalLockLeaseTime)为: (看门狗时间/3)

源代码:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
  if (leaseTime != -1) {
    return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  }
  RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {
      return;
    }

    // lock acquired
    if (ttlRemaining == null) {
      scheduleExpirationRenewal(threadId);
    }
  });
  return ttlRemainingFuture;
}

传递时间时执行的方法:执行lua脚本

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

定时任务来做续期

private void renewExpiration() {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
    return;
  }

  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
      if (ent == null) {
        return;
      }
      Long threadId = ent.getFirstThreadId();
      if (threadId == null) {
        return;
      }

      RFuture<Boolean> future = renewExpirationAsync(threadId);
      future.onComplete((res, e) -> {
        if (e != null) {
          log.error("Can't update lock " + getName() + " expiration", e);
          return;
        }

        if (res) {
          // reschedule itself
          renewExpiration();
        }
      });
    }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

  ee.setTimeout(task);
}

最佳实践是传时间,省去了整个续期的操作,给定合理的过期时间即可。

读写锁测试:

/**
 * 测试读写锁 - 写
 */
@GetMapping("/write")
@ResponseBody
public String writeValue() {
  RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
  RLock rLock = lock.writeLock();
  String s = "";
  try {
    rLock.lock();
    s = UUID.randomUUID().toString();
    // 模拟业务时长
    Thread.sleep(30000);
    redisTemplate.opsForValue().set("writeValue", s);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    rLock.unlock();
  }
  return s;
}

/**
  * 测试读写锁 - 读
  * <p>
  * 保证一定能读到最新的数据,修改期间,写锁是一个排他锁(互斥锁),读锁是一个共享锁
  * 写锁没释放 读就必须等待
  */
@GetMapping("/read")
@ResponseBody
public String readValue() {
  RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
  RLock rLock = lock.readLock();
  rLock.lock();
  String writeValue = "";
  try {
    writeValue = redisTemplate.opsForValue().get("writeValue");
  } catch (Exception e) {
    e.printStackTrace();
  } finally {
    rLock.unlock();
  }
  return writeValue;
}

总结:

  • 读+读 :相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
  • 写+读 :等待写锁释放
  • 写+写 :阻塞方式
  • 读+写 :有读锁,写也需要等待

即只要有写的存在,都必须等待

信号量测试:

/**
  * 测试信号量
  * <p>
  * 模拟车库停车
  * 车位 3  测试的时候先在redis中先设置当前车位数  set park 3
  */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
  RSemaphore semaphore = redisson.getSemaphore("park");
  // acquire()是阻塞的,当没有车位时会一直等到有释放时才返回
  // 如果不想阻塞 可以使用 tryAcquire() 会返回一个布尔值
  semaphore.acquire(); // 获取一个信号,获取一个值,即占一个车位
  return "ok";
}

/**
  * 出库
  */
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
  RSemaphore semaphore = redisson.getSemaphore("park");
  semaphore.release(); // 释放一个信号,即空出一个车位
  return "ok";
}

闭锁测试:

/**
  * 测试闭锁
  * <p>
  * 模拟学校关闭大门 只要当5个班级人都走完了 才可以关闭大门
  */
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
  RCountDownLatch door = redisson.getCountDownLatch("door");
  door.trySetCount(5);
  door.await(); //等待闭锁都完成
  return "放假喽!关大门了哈";
}

@GetMapping("/outDoor/{id}")
@ResponseBody
public String outDoor(@PathVariable("id") Long id) {
  // 这里只是模拟 不用考虑真实场景
  RCountDownLatch door = redisson.getCountDownLatch("door");
  door.countDown(); // 计数减一
  return id + "班的人都走完了";
}

缓存数据一致性

  • 双写模式
  • 失效模式

无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?

  1. 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加

上过期时间,每隔一段时间触发读的主动更新即可

  1. 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。

  2. 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。

  3. 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心

脏数据,允许临时脏数据可忽略);

总结:

  • 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保

证每天拿到当前最新数据即可。

  • 我们不应该过度设计,增加系统的复杂性

  • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

整合springcache

整合springcache,简化缓存开发

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

指定使用redis来缓存

spring.cache.type=redis

spring提供的几个注解:

  • @Cacheable: Triggers cache population:触发将数据保存到缓存的操作
  • @CacheEvict: Triggers cache eviction:触发将数据从缓存中删除的操作
  • @CachePut: Updates the cache without interfering with the method execution:不影响方法执行更新缓存
  • @Caching: Regroups multiple cache operations to be applied on a method:组合以上多个操作
  • @CacheConfig: Shares some common cache-related settings at class-level:在类级别共享缓存的相同配置

测试:

  1. 开启缓存功能:启动类上加上 @EnableCaching注解
  2. 只需要使用注解就能完成缓存

使用缓存后的默认行为:

  • 如果缓存中有数据,则方法不会调用,即直接返回缓存中的数据
  • key默认自动生成:缓存的名字::SimpleKey []
  • 缓存的value值,默认使用jdk序列化机制,将序列化后的数据存到redis
  • 默认ttl时间:-1,即用不过期

以上默认行为导致的结果与我们实际需求不同,所有我们可以自定义这些配置:

自定义:

  • 指定生成的key : 通过key属性指定,接收一个spEL表达式
  • 指定缓存数据的过期时间:配置文件中配置
  • 将保存的value数据转为json格式

原理:CacheAutoConfiguration -> RedisCacheConfiguration -> 自动配置了RedisCacheManager -> 初始化所有的缓存 -> 每个缓存决定使用什么配置 -> 如果redisCacheConfiguration有就用已有的,没有就使用默认配置

所以,想要改缓存的配置,只需要给容器中注入一个 RedisCacheConfiguration即可

就会应用到当前RedisCacheManager管理的所有缓存分区中;

可以参考源码中默认配置来自己写一个RedisCacheConfiguration:

源码中的默认配置:

 * <dd>{@link org.springframework.data.redis.serializer.StringRedisSerializer}</dd>
 * <dt>value serializer</dt>
 * <dd>{@link org.springframework.data.redis.serializer.JdkSerializationRedisSerializer}</dd>

从这注释中可以看出 k采用的是字符串序列化,v采用的是jdk序列化

自定义配置:

@EnableCaching
@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class MyRedisCacheConfig {
    @Bean
    RedisCacheConfiguration configuration(CacheProperties cacheProperties) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }
        return config;
    }
}

redis缓存的其他配置

spring.cache.type=redis
# 单位为毫秒
spring.cache.redis.time-to-live=3600000
# 缓存的key 加前缀, 可以用来区分redis中的值哪些是缓存用的数据
# 如果这里配置了前缀 就代替了默认前缀 之前的默认前缀 缓存名字::
spring.cache.redis.key-prefix=CHCHE_
# 是否使用配置的前缀
spring.cache.redis.use-key-prefix=true
# 是否缓存空值,开启 防止缓存穿透
spring.cache.redis.cache-null-values=true

缓存数据测试:

@Cacheable(value = {"category"}, key = "#root.method.name")
@Override
public List<CategoryEntity> getLevelOne() { ... }

删除缓存测试:

// 分类数据更新的时候 触发删除缓存 指定缓存分区 再指定key  注意这里的key 接收的是一个spEL表达式,如果是普通字符串 需要里面加单引号
@CacheEvict(value = "category", key = "'getLevelOne'")

如果一个要删除多个缓存,就可以使用@Caching,它可以组合其他注解

@Caching(evict = {
  @CacheEvict(value = "category", key = "'getLevelOne'"),
  @CacheEvict(value = "category", key = "'getCatalogJson'")
})

或者可以指定删除某个缓存分区下的所有缓存,这也是使用缓存分区的好处

@CacheEvict(value = "category", allEntries = true)

所以我们约定,存储同一类型的数据 使用同一个缓存分区

且为了方便管理 配置文件中,不知道自定义前缀,就使用默认的 分区名为前缀

Spring-Cache 的不足:

使用@Cacheable时可以指定sync = true解决缓存击穿问题,但是不是分布式锁。