Redis入門到實戰

一、Redis基礎

Redis所有的命令都可以去官方網站查看

1.基本命令

keys *

查找所有符合給定模式pattern(正則表達式)的 key 。可以進行模糊匹配

del key1,key2,...

刪除指定的一批keys,如果刪除中的某些key不存在,則直接忽略。被刪除的keys的數量

exists key

返回key是否存在。0:key不存在 1:key存在

EXPIRE key seconds

設置key的過期時間,超過時間後,將會自動刪除該key。

TTL key

返回key剩餘的過期時間。
-1:永不過期
-2:過期或不存在

type key

查看鍵的類型

Redis中的數據都是以”字符串”形式存儲的

2.key的結構

Redis的key允許有多個單詞組成層級結構,多個單詞使用 『:』隔開
例如:項目名:業務名:類型:id

3.String類型

SET key

將鍵key設定為指定的「字符串」值。
如果 key 已經保存了一個值,那麼這個操作會直接覆蓋原來的值,並且忽略原始類型。
當set命令執行成功之後,之前設置的過期時間都將失效

Get key

根據key獲取值value

append key value

將給定的value 追加到原值的末尾

strlen key

獲得值的長度

mset k1 v1 k2 v2 k3 v3....

設置多個鍵值對

mget k1 k2 k3 ......

根據key獲取多個值

incr key

給整數型value加一

incrby key step

給整型的key子這自增並且指定步長,

decr key

給整數型value減一

setnx k1 v1

添加一個鍵值對,不存在則執行,否則不執行

msetnx k1 v1 k2 v2 k3 v3...

同時設置一個或多個 key-value 對,當且僅當所有給定 key 都不存在

setex key sceneds

設置鍵值的同時,設置過期時間,單位秒

getrange key startIndex endIndex

獲得值的範圍,類似java中的substring

setrange key startIndex value

用value 覆寫key所儲存的字符串值,從startIndex開始

getset key value

以新換舊,設置了新值同時獲得舊值

4.List類型

lpush/rpush k1 v1 k2 v2 ...

從左邊/右邊插入一個或多個值

lpop/rpop key

從左邊/右邊吐出一個值

rpoplpush k1 k2

從k1列表右邊吐出一個值,插到k2列表左邊

lrange key startIndex endIndex

按照索引下標獲得元素(從左到右)
查詢所有 0 -1

lindex key index

按照索引下標獲得元素(從左到右)

llen key

獲得列表長度

linsert key before|after value newvalue

在 value 的後面插入 newvalue 插入值

lrem key n value

從左邊刪除n個value(從左到右)

5.set類型

Set是string類型的無序集合。它底層其實是一個value為null的hash表,所以添加,刪除,查找的複雜度都是O(1)

sadd k1 v1 v2 v3....

將一個或多個 member 元素加入到集合 key 當中,已經存在於集合的 member 元素將被忽略

smembers key

取出該集合的所有值

sismember key value

判斷集合 key 是否為含有該 value 值,有返回1,沒有返回0

scard key

返回該集合的元素個數

srem key value1 value2....

刪除集合中的某個元素

spop key

隨機從該集合中吐出一個值

srandmember key n

隨機從該集合中取出n個值。
不會從集合中刪除

SINTER k1 k2

返回兩個集合的交集元素

SUNION k1 k2

返回兩個集合的並集元素

SDIFF k2 k1

返回兩個集合的差集元素

6.hash類型

hash是一個string類型的field和value的映射表,hash特別適合用於存儲對象,類似Java裏面的Map<String,String>

hset key filed value

按照hash形式存儲內容,添加或者修改hash類型的key的filed的值
HSET qbb:user:1 name qiuqiu
HSET qbb:user:1 age 18

hget key filed

獲取一個hash類型的key的filed的值
HGET qbb:user:1 name
HGET qbb:user:1 age

hmset key filed value filed value ...

批量添加多個hash類型的key的filed值
HMSET qbb:user:2 name qq age 18 sex woman

hmget key filed filed filed ...

批量獲取多個hash類型的key1的filed值
HMGET qbb:user:2 name age sex

hgetall key

獲取一個hashkey中的所有filed和value
HGETALL qbb:user:1

hkeys key

獲取hash類型的key中的所有field
hkeys qbb:user:1

hvals key

獲取hash類型的key的所有值value
HVALS qbb:user:1

hexists key field

查看哈希表 key 中,給定域 field 是否存在

hincrby key filed

讓hash類型的key的字段值自動增長指定步長(負數則自減)
HINCRBY qbb:user:1 age 1

hsetnx key filed value

添加一個hash類型的key的filed值,前提是這個filed不存在,否則不執行
HSETNX qbb:user:1 address wuhan

7.zset類型(sorted set)

Redis有序集合zset與普通集合set非常相似,是一個沒有重複元素的字符串集合。不同之處是有序集合的每個成員都關聯了一個評分(score) ,這個評分(score)被用來按照從最低分到最高分的方式排序集合中的成員。集合的成員是唯一的,但是評分可以是重複了 。
因為元素是有序的, 所以你也可以很快的根據評分(score)或者次序(position)來獲取一個範圍的元素。訪問有序集合的中間元素也是非常快的,因此你能夠使用有序集合作為一個沒有重複成員的智能列表

zadd key score1 value1 score2 value2

將一個或多個 member 元素及其 score 值加入到有序集 key 當中

zrange key start stop [WITHSCORES]

返回有序集 key 中,下標在 start stop 之間的元素
帶WITHSCORES,可以讓分數一起和值返回到結果集

zrangebyscore key min max [withscores]

返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列

zrevrangebyscore key max min [withscores]

同上,改為從大到小排列

zrem key value

刪除該集合下,指定值的元素

zcount key min max

統計該集合,分數區間內的元素個數

zrank key value

返回該值在集合中的排名,從0開始

二、Redis實戰

1.基於Redis實現手機驗證碼

@Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result sendCode(String phone, HttpSession session) {
        // 1.校驗手機號
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回錯誤信息
            return Result.fail("手機號格式錯誤!");
        }
        // 3.符合,生成驗證碼
        String code = RandomUtil.randomNumbers(6);

        // 4.保存驗證碼到 session
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

        // 5.發送驗證碼
        log.debug("發送短訊驗證碼成功,驗證碼:{}", code);
        // 返回ok
        return Result.ok();
    }

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1.校驗手機號
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回錯誤信息
            return Result.fail("手機號格式錯誤!");
        }
        // 3.從redis獲取驗證碼並校驗
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if (cacheCode == null || !cacheCode.equals(code)) {
            // 不一致,報錯
            return Result.fail("驗證碼錯誤");
        }

        // 4.一致,根據手機號查詢用戶 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();

        // 5.判斷用戶是否存在
        if (user == null) {
            // 6.不存在,創建新用戶並保存
            user = createUserWithPhone(phone);
        }

        // 7.保存用戶信息到 redis中
        // 7.1.隨機生成token,作為登錄令牌
        String token = UUID.randomUUID().toString(true);
        // 7.2.將User對象轉為HashMap存儲
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
        // 7.3.存儲
        String tokenKey = LOGIN_USER_KEY + token;
        stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
        // 7.4.設置token有效期
        stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

        // 8.返回token
        return Result.ok(token);
    }

2.實現登錄狀態刷新

public class RefreshTokenInterceptor implements HandlerInterceptor {

    private StringRedisTemplate stringRedisTemplate;

    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.獲取請求頭中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2.基於TOKEN獲取redis中的用戶
        String key  = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 3.判斷用戶是否存在
        if (userMap.isEmpty()) {
            return true;
        }
        // 5.將查詢到的hash數據轉為UserDTO
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 6.存在,保存用戶信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 7.刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用戶
        UserHolder.removeUser();
    }
}

3.緩存和數據庫一致性

  • 使用雙寫模式,先寫數據庫,在刪除緩存

4.緩存穿透

  • 緩存null值
  • 布隆過濾器,注意布隆過濾器的誤判,適當時間重建布隆
/**
     * 根據skuId查詢商品詳情
     * <p>
     * 使用Redis實現分佈式鎖:
     * 解決大並發下,緩存擊穿|穿透問題
     *
     * @param skuId
     * @return
     */
    public SkuItemTo findSkuItemWithRedisDistLock(Long skuId) {
        // 緩存key
        String cacheKey = RedisConstants.SKU_CACHE_KEY_PREFIX + skuId;
        // 查詢緩存
        SkuItemTo data = cacheService.getData(RedisConstants.SKU_CACHE_KEY_PREFIX + skuId, new TypeReference<SkuItemTo>() {
        });
        // 判斷是否命中緩存
        if (data == null) {
            // 緩存沒有,回源查詢數據庫.但是這個操作之前先問一下bloom是否需要回源
            if (skuIdBloom.contains(skuId)) {
                // bloom返回true說明數據庫中有
                log.info("緩存沒有,bloom說有,回源");
                SkuItemTo skuItemTo = null;
                // 使用UUID作為鎖的值,防止修改別人的鎖
                String value = UUID.randomUUID().toString();
                // 摒棄setnx ,加鎖個設置過期時間不是原子的
                // 原子加鎖,防止被擊穿 分佈式鎖 設置過期時間
                Boolean ifAbsent = stringRedisTemplate.opsForValue()
                        .setIfAbsent(RedisConstants.LOCK + skuId, value, RedisConstants.LOCK_TIMEOUT, TimeUnit.SECONDS);
                if (ifAbsent) {
                    try {
                        // 設置自動過期時間,非原子的,加鎖和設置過期時間不是原子的操作,所以會出現問題
                        // stringRedisTemplate.expire(RedisConstants.LOCK, RedisConstants.LOCK_TIMEOUT, TimeUnit.SECONDS);

                        // 大量請求,只有一個搶到鎖
                        log.info(Thread.currentThread().getName() + "搶到鎖,查詢數據庫");
                        skuItemTo = findSkuItemDb(skuId); // 執行回源查詢數據庫
                        // 把數據庫中查詢的數據緩存里存一份
                        cacheService.saveData(cacheKey, skuItemTo);
                    } finally { // 解鎖前有可能出現各種問題導致解鎖失敗,從而出現死鎖
                        // 釋放鎖,非原子,不推薦使用
                        // String myLock = stringRedisTemplate.opsForValue().get(RedisConstants.LOCK);

                        //刪鎖: 【對比鎖值+刪除(合起來保證原子性)】
                        String deleteScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                        Long executeResult = stringRedisTemplate.execute(new DefaultRedisScript<>(deleteScript, Long.class),
                                Arrays.asList(RedisConstants.LOCK + skuId), value);

                        // 判斷是否解鎖成功
                        if (executeResult.longValue() == 1) {
                            log.info("自己的鎖:{},解鎖成功", value);
                            stringRedisTemplate.delete(RedisConstants.LOCK);
                        } else {
                            log.info("別人的鎖,解不了");
                        }
                    }
                } else {
                    // 搶鎖失敗,自旋搶鎖. 但是實際業務為我們只需要讓讓程序緩一秒再去查緩存就好了
                    try {
                        log.info("搶鎖失敗,1秒後去查詢緩存");
                        Thread.sleep(1000);
                        data = cacheService.getData(RedisConstants.SKU_CACHE_KEY_PREFIX + skuId, new TypeReference<SkuItemTo>() {
                        });
                        return data;
                    } catch (InterruptedException e) {
                    }
                }
                return skuItemTo;
            } else {
                log.info("緩存沒有,bloom也說沒有,直接打回");
                return data;
            }
        }
        log.info("緩存中有數據,直接返回,不回源");
        // 價格不緩存,有些需要變的數據,可以"現用現拿"
        Result<BigDecimal> decimalResult = productFeignClient.findPriceBySkuId(skuId);
        if (decimalResult.isOk()) {
            BigDecimal price = decimalResult.getData();
            data.setPrice(price);
        }
        return data;
    }

5.緩存擊穿

  • 利用分佈式鎖,解決緩存擊穿問題
/**
     * 根據skuId查詢商品詳情
     * 使用Redisson框架
     *
     * @param skuId
     * @return
     */
    public SkuItemTo findSkuItemWithRedissonLock(Long skuId) {
        // 1.先查詢緩存
        String cacheKey = RedisConstants.SKU_CACHE_KEY_PREFIX + skuId;
        SkuItemTo data = cacheService.getData(cacheKey, new TypeReference<SkuItemTo>() {
        });
        // 2.判斷是否為null
        if (data == null) {
            // 2.1為null緩存沒有,需要回源
            // 2.2回源之前問一下bloom過濾器,是否有必要回源
            boolean contains = skuIdBloom.contains(skuId);
            if (contains) {
                log.info("bloom說有...準備回源");
                // 2.2.1創建一把鎖
                RLock lock = redissonClient.getLock(RedisConstants.SKU_LOCK + skuId);
                // 2.2.2數據庫中存在對應的ID數據,回源
                boolean tryLock = false;
                try {
                    // 2.2.3回源之前先上鎖
                    tryLock = lock.tryLock();
                    if (tryLock) {
                        log.info(Thread.currentThread().getName() + ":獲取到鎖了");
                        // 加鎖成功
                        // 回源,查詢數據庫是數據
                        SkuItemTo skuItemTo = findSkuItemDb(skuId);
                        // 緩存中存一份
                        cacheService.saveData(cacheKey, skuItemTo);
                        // 返回數據
                        return skuItemTo;
                    }
                } finally {
                    // 解鎖
                    try {
                        if (tryLock) lock.unlock();
                    } catch (Exception e) {
                        log.info("解到別人的鎖了");
                    }
                }
                // 加鎖失敗,睡一秒查緩存
                try {
                    Thread.sleep(1000);
                    data = cacheService.getData(cacheKey, new TypeReference<SkuItemTo>() {
                    });
                    return data;
                } catch (InterruptedException e) {
                }
            } else {
                log.info("bloom打回");
                // 不存在對應的ID數據,不回源
                return null;
            }
        }
        // 緩存不為空,直接返回數據
        return data;
    }

6.緩存雪崩

  • 緩存過期時間加上隨機值
/**
     * 添加數據到緩存
     *
     * @param key
     * @param data
     */
    @Override
    public void saveData(String key, Object data) {
        if (data == null) {
            // 緩存null值,防止緩存穿透.設置緩存過期時間
            stringRedisTemplate.opsForValue().set(key, cacheConfig.getNullValueKey(),
                    cacheConfig.getNullValueTimeout(), cacheConfig.getNullTimeUnit());
        } else {
            // 為了防止緩存同時過期,發生緩存雪崩.給每個緩存過期時間加上隨機值
            Double value = Math.random() * 10000000L;
            long mill = 1000 * 60 * 24 * 3 + value.intValue();
            stringRedisTemplate.opsForValue().set(key, JsonUtils.toStr(data),
                    mill, cacheConfig.getDataTimeUnit());
        }
    }

7.Redis實現全局唯一ID

@Component
public class RedisIdWorker {
    /**
     * 開始時間戳
     */
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    /**
     * 序列號的位數
     */
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1.生成時間戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列號
        // 2.1.獲取當前日期,精確到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增長
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 3.拼接並返回
        return timestamp << COUNT_BITS | count;
    }
}

8.秒殺解決庫存超賣問題

  • 使用樂觀鎖解決 stock > 0
@Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 5.一人一單
        Long userId = voucherOrder.getUserId();

        // 5.1.查詢訂單
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判斷是否存在
        if (count > 0) {
            // 用戶已經購買過了
            log.error("用戶已經購買過一次!");
            return;
        }

        // 6.扣減庫存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣減失敗
            log.error("庫存不足!");
            return;
        }

        // 7.創建訂單
        save(voucherOrder);
    }

9.基於BlockingQueue阻塞隊列異步秒殺下單

image

  • 把下單的功能放入阻塞隊列中,實現異步的下單。這樣可以更好的提高吞吐量
/**
     * 創建一個阻塞隊列
     */
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    /**
     * 創建一個線程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    /**
     * 類初始化完成就執行任務,從隊列中消費消息,也就是創建訂單
     */
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
    }

    /**
     * 創建一個任務處理器
     */
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.獲取隊列中的訂單信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 2.創建訂單
                    handlerVoucherOrder(voucherOrder);
                } catch (InterruptedException e) {
                    log.error("訂單異常:{}", e);
                }
            }
        }
    }

    private void handlerVoucherOrder(VoucherOrder voucherOrder) {
        // 獲取用戶ID
        Long userId = voucherOrder.getUserId();
        // 創建鎖對象
        RLock redisLock = redissonClient.getLock("order:" + userId);
        // 獲取鎖
        boolean isLock = redisLock.tryLock();
        // 判斷獲取鎖是否成功
        if (!isLock) {
            // 獲取鎖失敗
            log.error("不允許重複下單!!!");
            return;
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 釋放鎖
            redisLock.unlock();
        }
    }
  • 基於JVM阻塞隊列實現異步秒殺會有兩個大的問題
    • 內存限制
    • 數據安全問題,由於是內存操作,所以宕機訂單就丟失了

10.基於Redis的PubSub發佈訂閱模式異步秒殺下單

image
基於PubSub的消息隊列有哪些優缺點?

  • 優點:
    • 採用發佈訂閱模型,支持多生產、多消費
  • 缺點:
    • 不支持數據持久化
    • 無法避免消息丟失
    • 消息堆積有上限,超出時數據丟失

11.基於Redis的Stream數據類型異步秒殺下單(Redis5.0引入)

image

image

image

  • 上面的Stream方式會出現漏讀消息的情況,所以下面使用Stream的ConsumerGroup(消費者組的概念)實現
    image

image

image

  • 代碼實現流程
    image
/**
     * 類初始化完成就執行任務
     */
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
    }

    /**
     * 創建一個任務處理器,獲取消息隊列中的消息
     */
    private class VoucherOrderHandler implements Runnable {
        // 隊列名
        String queueName = "stream.orders";

        @Override
        public void run() {
            while (true) {
                try {
                    // 1.獲取消息隊列中的訂單信息
                    List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream()
                            .read(Consumer.from("g1", "c1"),
                                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                    StreamOffset.create(queueName, ReadOffset.lastConsumed())
                            );
                    // 2.判斷一下消息是否獲取成功
                    if (list == null || list.isEmpty()) {
                        continue;
                    }
                    // 獲取訂單信息
                    MapRecord<String, Object, Object> entries = list.get(0);
                    Map<Object, Object> map = entries.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                    // 3.創建訂單
                    handlerVoucherOrder(voucherOrder);
                    // 4.ACK確認
                    redisTemplate.opsForStream().acknowledge(queueName, "g1", entries.getId());
                } catch (Exception e) {
                    log.error("訂單異常:{}", e);
                    // 從pending-list中獲取消息
                    handlePendingList();
                }
            }
        }

        /**
         * 處理消費失敗的消息
         */
        private void handlePendingList() {
            while (true) {
                try {
                    // 1.獲取pending-list中的訂單信息
                    List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from(")"))
                    );
                    // 2.判斷一下消息是否獲取成功
                    if (list == null || list.isEmpty()) {
                        // pending-list中沒有消息
                        break;
                    }
                    // 獲取訂單信息
                    MapRecord<String, Object, Object> entries = list.get(0);
                    Map<Object, Object> map = entries.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                    // 3.創建訂單
                    handlerVoucherOrder(voucherOrder);
                    // 4.ACK確認
                    redisTemplate.opsForStream().acknowledge(queueName, "g1", entries.getId());
                } catch (Exception e) {
                    log.error("處理pending-list的訂單異常:{}", e);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

12.基於Redis實現點贊功能,使用SET數據結構實現

  • 一個用戶只能點贊一次
 /**
     * 修改點贊數量
     *
     * @param id
     * @return
     */
    @Override
    public Result likeBlog(Long id) {
        // 獲取登錄用戶
        UserDTO user = UserHolder.getUser();
        String userId = user.getId().toString();
        // 判斷當前登錄用戶是否點贊過
        String key = "blog:liked:" + id;
        Boolean isLike = redisTemplate.opsForSet().isMember(key, userId);
        // 判斷是否點贊過
        if (BooleanUtil.isFalse(isLike)) {
            // 未點贊
            // 數據庫+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            // 將用戶id保存到redis
            if (isSuccess) {
                redisTemplate.opsForSet().add(key, userId);
            }
        } else {
            // 已點贊
            // 數據庫-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            // 將用戶id從redis中移除
            if (isSuccess) {
                redisTemplate.opsForSet().remove(key, userId);
            }
        }
        return Result.ok();
    }

13.基於Redis實現點贊排行榜功能,使用SortedSet實現

  • 參考朋友圈點贊
/**
     * 修改點贊數量  sorted set集合
     *
     * @param id
     * @return
     */
    @Override
    public Result likeBlog(Long id) {
        // 獲取登錄用戶
        UserDTO user = UserHolder.getUser();
        String userId = user.getId().toString();
        // 判斷當前登錄用戶是否點贊過
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        Double score = redisTemplate.opsForZSet().score(key, userId);
        // 判斷是否點贊過
        if (score == null) {
            // 未點贊
            // 數據庫+1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            // 將用戶id保存到redis
            if (isSuccess) {
                redisTemplate.opsForZSet().add(key, userId, System.currentTimeMillis());
            }
        } else {
            // 已點贊
            // 數據庫-1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            // 將用戶id從redis中移除
            if (isSuccess) {
                redisTemplate.opsForZSet().remove(key, userId);
            }
        }
        return Result.ok();
    }

    /**
     * 查詢blog點贊的人
     *
     * @param id
     * @return
     */
    @Override
    public Result queryBlogLikes(Long id) {
        // 獲取緩存key
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        // 從zset中查詢點贊前5名
        Set<String> top5 = redisTemplate.opsForZSet().range(key, 0, 4);
        if (top5 == null || top5.isEmpty()) {
            return Result.fail("沒有點贊用戶!!!");
        }
        // 解析用戶id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        // 查詢用戶
        List<User> users = userService.listByIds(ids);
        List<UserDTO> userDTOList = users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(userDTOList);
    }

14.基於Redis實現共同關注功能,使用Set實現(SINTER :交集 , SDIFF :差集 ,SUNION :並集)

  • 查看兩個人的共同好友
/**
     * 關注或取關
     *
     * @param followUserId
     * @param isFollow
     * @return
     */
    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
        // 獲取登錄用戶
        Long userId = UserHolder.getUser().getId();
        // 判斷是關注還是取關
        if (isFollow) {
            // 關注
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            // 保存關注信息到數據庫
            boolean isSuccess = save(follow);
            if (isSuccess) {
                // 把關注的用戶存入Redis
                redisTemplate.opsForSet().add(RedisConstants.FOLLOW_USER_PREFIX + userId, followUserId.toString());
            }
        } else {
            // 取關
            boolean isSuccess = remove(Wrappers.<Follow>lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));
            if (isSuccess) {
                // 從Redis中刪除關注的用戶
                redisTemplate.opsForSet().remove(RedisConstants.FOLLOW_USER_PREFIX + userId, followUserId.toString());
            }
        }
        return Result.ok();
    }

    /**
     * 查詢是否關注
     *
     * @param followUserId
     * @return
     */
    @Override
    public Result isFollow(Long followUserId) {
        // 獲取登錄用戶
        Long userId = UserHolder.getUser().getId();
        // 查詢是否關注
        LambdaQueryWrapper<Follow> wrapper = Wrappers.<Follow>lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId);
        int count = count(wrapper);
        return Result.ok(count > 0);
    }

    /**
     * 查詢共同關注
     *
     * @param id
     * @return
     */
    @Override
    public Result followCommons(Long id) {
        // 獲取登錄用戶
        Long userId = UserHolder.getUser().getId();
        // 當前用戶的好友集合
        String key1 = RedisConstants.FOLLOW_USER_PREFIX + userId;
        // 點擊查看感興趣用戶的好友集合
        String key2 = RedisConstants.FOLLOW_USER_PREFIX + id;
        // 獲取共同好友
        Set<String> set = redisTemplate.opsForSet().intersect(key1, key2);
        if (set == null || set.isEmpty()) {
            return Result.ok(Collections.emptyList());
        }
        List<Long> ids = set.stream().map(Long::valueOf).collect(Collectors.toList());
        List<UserDTO> userDTOList = listByIds(ids)
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(userDTOList);
    }

15.基於Redis實現關注推送功能(Feed流),使用SortedSet實現

  • 在用戶發送文章是,推送給關注了此用戶的好友
  • Feed流有三種實現方式
    • 拉模式 (不推薦)
    • 推模式
    • 推拉結合模式
  • 基於推模式實現關注推送功能
/**
     * 保存blog,並推送給粉絲
     *
     * @param blog
     * @return
     */
    @Override
    public Result saveBlog(Blog blog) {
        // 獲取登錄用戶
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        // 保存探店博文
        boolean isSuccess = save(blog);
        if (isSuccess) {
            return Result.fail("新增Blog失敗!!!");
        }
        // 查詢此用戶的粉絲
        List<Follow> followList = followService.list(Wrappers.<Follow>lambdaQuery().eq(Follow::getFollowUserId, user.getId()));
        // 把此用戶發佈的blog發送給粉絲
        followList.stream().peek(follow -> {
            // 獲取粉絲的ID
            Long userId = follow.getUserId();
            // 推送
            String key = RedisConstants.FOLLOW_FEEF_PREFIX + userId;
            redisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
        });
        return Result.ok(blog.getId());
    }
  • 使用SortedSet的Score實現Feed流的滾動分頁
/**
     * 好友關注,推送消息
     *
     * @param max
     * @param offset
     * @return
     */
    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
        // 獲取當前用戶
        Long userId = UserHolder.getUser().getId();
        // 封裝key
        String key = RedisConstants.FOLLOW_FEEF_PREFIX + userId;
        Set<ZSetOperations.TypedTuple<String>> typedTuples = redisTemplate.opsForZSet()
                .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
        // 非空判斷一下
        if (typedTuples == null || typedTuples.isEmpty()) {
            return Result.ok();
        }
        // 解析數據
        List<Long> ids = new ArrayList<>(typedTuples.size());
        long minTime = 0;
        int os = 1;
        for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
            String idStr = typedTuple.getValue();
            ids.add(Long.valueOf(idStr));
            long time = typedTuple.getScore().longValue();
            if (time == minTime) {
                os++;
            } else {
                minTime = time;
                os = 1;
            }
        }
        // 根據id查詢blog
        String idStr = StrUtil.join(",", ids);
        List<Blog> blogs = query()
                .in("id", ids)
                .last("ORDER BY FIELD(id," + idStr + ")")
                .list();

        blogs.forEach(blog -> {
            // 查詢用戶
            queryBlogUser(blog);
            // 查詢是否點贊
            isBlogLiked(blog);
        });

        // 封裝返回的數據
        ScrollResult scrollResult = new ScrollResult();
        scrollResult.setList(blogs);
        scrollResult.setMinTime(minTime);
        scrollResult.setOffset(offset);
        return Result.ok(scrollResult);
    }

16.基於Redis實現附近、地理坐標功能,使用GEO實現

  • 導入坐標數據到Redis中
/**
     * 導入坐標數據
     */
    @Test
    public void geoTest() {
        // 查詢店鋪信息
        List<Shop> shopList = shopServiceImpl.list();
        // 分組
        Map<Long, List<Shop>> map = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));
        // 寫入redis
        for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
            Long typeId = entry.getKey();
            List<Shop> value = entry.getValue();
            // 組裝key
            String key = RedisConstants.SHOP_GEO_PREFIX + typeId;
            // 寫redis
           /* for (Shop shop : value) {
                redisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
            }*/

            // 改進寫法,銷量高一點
            List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
            for (Shop shop : value) {
                locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX(), shop.getY())));
            }
            redisTemplate.opsForGeo().add(key, locations);
        }
    }
  • 實現附近商家功能,注意一點Redis的版本≥6.2
<!--修改Redis版本-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
	<exclusions>
		<exclusion>
			<artifactId>lettuce-core</artifactId>
			<groupId>io.lettuce</groupId>
		</exclusion>
		<exclusion>
			<artifactId>spring-data-redis</artifactId>
			<groupId>org.springframework.data</groupId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
	<version>2.7.2</version>
</dependency>

<dependency>
	<groupId>io.lettuce</groupId>
	<artifactId>lettuce-core</artifactId>
	<version>6.2.0.RELEASE</version>
</dependency>
  • 核心代碼
/**
     * 根據商鋪類型分頁查詢商鋪信息
     *
     * @param typeId  商鋪類型
     * @param current 頁碼
     * @param x       經度
     * @param y       緯度
     * @return 商鋪列表
     */
    @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
        // 判斷是否需要根據坐標查詢
        if (x == null || y == null) {
            Page<Shop> page = query()
                    .eq("type_id", typeId)
                    .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
            // 返回數據
            return Result.ok(page.getRecords());
        }
        // 分頁參數
        int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
        int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
        // 查詢Redis,按照距離排序、分頁
        String key = RedisConstants.SHOP_GEO_PREFIX + typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
                // 搜索範圍new Distance(5000),5000m
                .search(key,
                        GeoReference.fromCoordinate(x, y),
                        new Distance(5000),
                        RedisGeoCommands
                                .GeoRadiusCommandArgs
                                .newGeoRadiusArgs()
                                .includeDistance()
                                .limit(end));
        if (results == null) {
            return Result.ok(Collections.emptyList());
        }
        // 解析ID
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = results.getContent();
        // 判斷是否還有下一頁
        if (content.size() <= from) {
            return Result.ok(Collections.emptyList());
        }
        // 截取from ~ end的部分
        List<Long> ids = new ArrayList<>(content.size());
        Map<String, Distance> distanceMap = new HashMap<>(content.size());
        content.stream().skip(from).forEach(item -> {
            // 商品ID
            String shopId = item.getContent().getName();
            ids.add(Long.valueOf(shopId));
            // 距離
            Distance distance = item.getDistance();
            distanceMap.put(shopId, distance);
        });
        // 根據ID批量查詢shop
        String idStr = StrUtil.join(",", ids);
        List<Shop> shopList = query()
                .in("id", ids)
                .last("ORDER BY FIELD(id," + idStr + ")")
                .list();
        for (Shop shop : shopList) {
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        return Result.ok(shopList);
    }

17.基於Redis實現簽到功能,使用BitMap實現

  • BitMap基本命令
    image
/**
     * 用戶簽到
     *
     * @return
     */
    @Override
    public Result sign() {
        // 獲取當前用戶
        Long userId = UserHolder.getUser().getId();
        // 獲取日期
        LocalDateTime now = LocalDateTime.now();
        // 拼接key
        String keyPrefix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
        String key = RedisConstants.USER_SIGN_PREFIX + userId + keyPrefix;
        // 獲取當前日期是本月的第幾天
        int dayOfMonth = now.getDayOfMonth();
        // 寫入redis,簽到
        stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
        return Result.ok();
    }
  • 統計簽到次數,連續簽到次數
    image
/**
     * 統計連續簽到次數
     *
     * @return
     */
    @Override
    public Result signCount() {
        // 獲取當前用戶
        Long userId = UserHolder.getUser().getId();
        // 獲取日期
        LocalDateTime now = LocalDateTime.now();
        // 拼接key
        String keyPrefix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
        String key = RedisConstants.USER_SIGN_PREFIX + userId + keyPrefix;
        // 獲取當前日期是本月的第幾天
        int dayOfMonth = now.getDayOfMonth();
        // 獲取本月截止今天為止所有的簽到記錄
        List<Long> result = stringRedisTemplate.opsForValue()
                .bitField(key, BitFieldSubCommands.create()
                        .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
        if (result == null || result.isEmpty()) {
            return Result.ok(0);
        }
        Long num = result.get(0);
        if (num == null || num == 0) {
            return Result.ok(0);
        }
        // 定義一個計數器
        int count = 0;
        // 循環遍歷
        while (true) {
            // 讓這個數字和1做與運算,未簽到,結束
            if ((num & 1) == 0) {
                // 如果為0,說明未簽到,結束
                break;
            } else {
                // 不為0,說明簽到了,計數器加一
                count++;
            }
            // 把數字右移一位,拋棄最後以為bit位,繼續下一位bit位
            num >>>= 1; // 無符號右移一位
        }
        // 返回,簽到計數器
        return Result.ok(count);
    }

18.基於Redis實現UV統計功能,使用HyperLogLog實現

  • UV:全稱Unique Visitor 頁腳獨立訪客量,是指通過互聯網訪問,瀏覽這個網頁的自然人。一天內同一個用戶多次訪問該網站,只記錄一次
  • PV:全稱Page View ,頁腳頁面訪問量或點擊量,用戶每訪問網站的一個頁面,記錄一次PV,用戶多次打開頁面,則記錄多次PV。往往用來衡量網站的流量。

HyperLogLog基本用法

  • 天生唯一性,內存佔用永遠小於16kb
    image
@Test
public void testHyperLogLog() {
	String[] values = new String[1000];
	int j = 0;
	for (int i = 0; i < 1000000; i++) {
		j = i % 1000;
		values[j] = "qiu_" + i;
		if (j == 999) {
			redisTemplate.opsForHyperLogLog().add("hl", values);
		}
	}
	// 統計數量
	Long count = redisTemplate.opsForHyperLogLog().size("hl");
	System.out.println("count = " + count);
}

到這,入門到實戰篇整理完畢,推薦大家參考黑馬程序員虎哥講的Redis,講的很好。後面的高級篇和原理篇後面再整理,每天只能靠下班時間多學習學習了,各位小夥伴加油呀~~~我喜歡的人在很遠的地方,我必須更加努力!!!

Tags: