Redis的批量操作是什麼?怎麼實現的延時隊列?以及訂閱模式、LRU。

前言

這次的內容是我自己為了總結Redis知識而擴充的,上一篇其實已經總結了幾點知識了,但是Redis的強大,以及適用範圍之廣可不是單單一篇博文就能總結清的。所以這次準備繼續總結,因為第一個問題,Redis的批量操作,是我在面試過程中被真實問到的,當時沒答上來,也是因為確實沒了解過Redis的批量操作。

當時的問題,我還記得比較清晰:Redis執行批量操作的功能是什麼?使用場景就是搞促銷活動時,會做預緩存,會往緩存里放大批數據,如果直接放的話那麼會很慢,怎麼能提高效率呢?

Redis的批量操作-管道(pipeline)

首先Redis的管道(pipeline)並不是Redis服務端提供的功能,而是Redis客戶端為了減少網絡交互而提供的一種功能。

正常的一次Redis網絡交互如下:
在這裡插入圖片描述

pipeline主要就是將多個請求合併,進行一次提交給Redis服務器,Redis服務器將所有請求處理完成之後,再一次性返回給客戶端。
在這裡插入圖片描述
下面我們分析一下pipeline的原理
在這裡插入圖片描述
pipeline的一個交互過程是這樣的:

  1. 客戶端進程調用write命令將消息寫入到操作系統內核為套接字分配的發送緩衝區send buffer
  2. 客戶端操作系統通過網絡路由,將send buffer中的數據發送給服務器操作系統為套接字分配的接收緩衝區 receive buffer
  3. 服務端進程調用read命令從receive buffer中取出數據進行處理,然後調用write命令將相應信息寫入到服務端的send buffer中。
  4. 服務端操作系統通過網絡路由,將send buffer中的數據發送給客戶端操作系統的receive buffer
  5. 客戶端進程調用read命令將數據從receive buffer中取出進行業務處理。

在使用pipeline時需要注意:

  • pipeline執行的操作,和mget,mset,hmget這樣的操作不同,pipeline的操作是不具備原子性的。
  • 還有在集群模式下因為數據是被分散在不同的slot裏面的,因此在進行批量操作的時候,不能保證操作的數據都在同一台服務器的slot上,所以集群模式下是禁止執行像mget、mset、pipeline等批量操作的,如果非要使用批量操作,需要自己維護key與slot的關係。
  • pipeline也不能保證批量操作中有命令執行失敗了而中斷,也不能讓下一個指令依賴上一個指令,如果非要這樣的複雜邏輯,建議使用lua腳本來完成操作。

Redis實現消息隊列和延時隊列

消息隊列

Redis的實現消息隊列可以用list來實現,通過lpush與rpop或者rpush與lpop結合來實現消息隊列。
在這裡插入圖片描述
但是若是list為空後,無論是lpop還是rpop都會持續的獲取list中的數據,若list一直為空,持續的拉取數據,一是會增加客戶端的cpu利用率,二是也增高了Redis的QPS,解決方案是使用blpopbrpop來代替lpop或rpop。
其實blpop和brpop的作用是bloking pop,就是阻塞拉取數據,當消息隊列中為空時就會停止拉取,有數據後立即恢復拉取。

但是當沒有數據的時候,阻塞拉取,就會一直阻塞在那裡,時間久了就成了空閑連接,那麼Redis服務器一般會將時間閑置過久的連接直接斷掉,以減少連接資源。所以還要檢測阻塞拉取拋出的異常然後進行重試。

另外一點,就是Redis實現的消息隊列,沒有ACK機制,所以想要實現消息的可靠性,還要自己實現當消息處理失敗後,能繼續拋回隊列。

延時隊列

用Redis實現延時隊列,其實就是使用zset來實現,將消息序列化成一個字符串(可以是json格式),作為為value,消息的到期處理時間做為score,然後用多線程去輪詢zset來獲取到期消息進行處理。

多線程輪詢處理,保證了可用性,但是要做冪等或鎖處理,保證不要重複處理消息。

主要的實現代碼如下。

/**
 * 放入延時隊列
 * @param queueMsg
 */
private void delay(QueueMsg queueMsg){

    String msg = JSON.toJSONString(queueMsg);

    jedis.zadd(queueKey,System.currentTimeMillis()+5000,msg);

}

/**
 * 處理隊列中從消息
 */
private void lpop(){
    while (!Thread.interrupted()){
        // 從隊列中取出,權重為0到當前時間的數據,並且數量只取一個
        Set<String> strings = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
		// 如果消息為空,就歇會兒再取。
        if(strings.isEmpty()){
            try {
            	//休息一會兒
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
            continue;
        }
        String next = strings.iterator().next();
        // 如果搶到了消息
        if(jedis.zrem(queueKey,next)>0){
            // 反序列化後獲取到消息
            QueueMsg queueMsg = JSON.parseObject(next, QueueMsg.class);
            // 進行消息處理
            handleMsg(queueMsg);
        }
    }
}

訂閱模式

Redis的主題訂閱模式,其實並不想過多總結,因為由於它本身的一些缺點,導致它的應用場景比較窄。

前面總結的用Redis的list實現的消息隊列,雖然可以使用,但是並不支持消息多播的場景,即一個生產者,將消息放入到多個隊列中,然後多個消費者進行消費。
在這裡插入圖片描述
這種消息多播的場景常用來做分佈式系統中的解耦。用哦publish進行生產者發送消息,消費者使用subscribe進行獲取消息。

例如:我向jimoerChannel發送了一條消息 b-tree

127.0.0.1:6379> publish jimoerChannel b-tree
(integer) 1

訂閱這個渠道的消費者立馬收到了一條b-tree的消息。

127.0.0.1:6379> subscribe jimoerChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "jimoerChannel"
3) (integer) 1
1) "message"
2) "jimoerChannel"
3) "b-tree"

我前面也說到了,Redis的pub/sub訂閱模式,其實最大的缺點就是,消息不能持久化,這樣就導致,若是消費者掛了或是沒有消費者,那麼消息就會被直接丟棄。因為這個原因,所以導致他的使用場景比較少。

IO模型

Redis的過期策略

Redis的過期策略是適用於所有數據結構的。數據一到過期時間就自動刪除,Redis會將設置了過期時間的key 放置在一個字典表裡。

定期刪除

Redis會定期遍歷字典表裏面數據來刪除過期的Key。
Redis默認的定期刪除策略是每秒進行10次過期掃描,即每100ms掃描一次。並不是掃描全部設置了過期時間的key,而是隨機掃描20個key,刪除掉已經過期的key,如果過期的比率超過25%,那麼就繼續進行掃描。

惰性刪除

因為定期刪除是隨機抽取一些key來進行過期刪除,所以如果key並沒有被定期掃描到,那麼過期的key就不會被刪除。所以Redis還提供了惰性刪除的策略,就是當去查詢某些key的時候,若是key已經過期了,那麼就會刪除key,然後返回null。

另外一點當在集群條件下,主從同步情況中,主節點中的key過期後,會在aof中生成一條刪除指令,然後同步到從節點,這樣的從節點在接收到aof的刪除指令後,刪除掉從節點的key,因為主從同步的時候是異步的所以,短暫的會出現主節點已經沒有數據了,但是從節點還存在。

但是若是定期刪除也沒有掃描到key,而且好長時間也沒去去使用key,那麼這部分過期的key就會一直佔用的內存。
所以Redis又提供了內存淘汰機制。

內存淘汰機制

當Redis的內存出現不足時,就會持續的和磁盤進行交互,這樣就會導致Redis卡頓,效率降低等情況。這在線上是不允許發生的,所以Redis提供了配置參數 maxmemory 來限制內存超出期望大小。

當內存使用情況超過maxmemory的值時,Redis提供了以下幾種策略,來讓使用者通過配置決定該如何騰出內存空間來繼續提供服務。

  • noeviction 不會繼續提供寫請求(del請求可以),讀請求可以,寫請求會報錯,這樣保證的數據不會丟失,但是業務不可用,這是默認的策略。
  • volatile-lru 會將設置了過期時間的key中,淘汰掉最近最少使用的key。沒有設置過期時間的key不會被淘汰,保證了需要持久化的數據不丟。
  • volatile-ttl 嘗試將設置了過期時間的key中,剩餘生命周期越短,越容易被淘汰。
  • volatile-random 嘗試將從設置了過期時間的key中,隨機選擇一些key進行淘汰。
  • allkeys-lru 從所有key中,淘汰掉最近最少使用的key。
  • allkeys-random 從所有key中,隨機淘汰一部分key。

那麼具體設置成哪種淘汰策略呢?
這就是要看在使用Redis時的具體場景了,如果只是用Redis做緩存的話,那麼可以配置allkeys-lru或allkey-random,客戶端在寫緩存的時候並不用攜帶着過期時間。若是還想要用持久化的功能,那麼就應該使用volatile-開頭的策略,這樣可以保證每月設置過期時間的key不會被淘汰。

內存淘汰策略的配置如下:

# 最大使用內存
 maxmemory 5m
# 內存淘汰策略 The default is:noeviction
 maxmemory-policy allkeys-lru

LRU算法

LRU算法的實現,其實可以靠一個鏈表。鏈表按照使用情況來進行排序,當空間不足時,會剔除掉尾部的數據。當某個元素被訪問時它會被移動到鏈表頭。

在真實的面試中,若是讓寫出LRU算法,我認為可以使用Java中的LikedHashMap來實現,因為LikedHashMap已經實現了基本的LRU功能,我只需要封裝一下就改造成了自己的了。

/**
 * @author Jimoer
 * @description
 */
public class MyLRUCache<K,V> {
    // lru容量
    private int lruCapacity;
    // 數據容器(內存)
    private Map<K,V> dataMap;

    public MyLRUCache(int capacity){
        
        this.lruCapacity = capacity;
        // 設置LinkedHashMap的初始容量為LRU的最大容量,
        // 擴容因子為默認的0.75,第三個參數是否將數據按照訪問順序排序。
        dataMap = new LinkedHashMap<K, V>(capacity, 0.75f, true){
            @Override
            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                // 當數據量大於lruCapacity時,移除掉最老使用的數據。
                return super.size()>lruCapacity;
            }
        };
    }

    public V get(K k){
        return dataMap.get(k);
    }

    public void put(K key, V value){
        dataMap.put(key,value);
    }

    public int getLruCapacity() {
        return lruCapacity;
    }

    public Map<K, V> getDataMap() {
        return dataMap;
    }

}

測試代碼:

@Test
public void lruTest(){
    // 內存容量為3,即存儲3條數據後,再放入數據,就會將最老使用的數據刪除
    MyLRUCache myLRUCache = new MyLRUCache(3);

    myLRUCache.put("1k","張三");
    myLRUCache.put("2k","李四");
    myLRUCache.put("3k","王五");
    // 容量已滿
    System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap()));
    // 繼續放入數據,該刪除第一條數據為第四條數據騰出空間了
    myLRUCache.put("4k","趙六");
    // 打印出結果
    System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap()));
}

運行結果:

myLRUCache:{"1k":"張三","2k":"李四","3k":"王五"}
myLRUCache:{"2k":"李四","3k":"王五","4k":"趙六"}

總結

好了,Redis的相關知識,就總結到這裡了,算上前面兩篇博文(Redis基礎數據結構總結你說一下Redis為什麼快吧,怎麼實現高可用,還有持久化怎麼做的),這是Redis的第三篇了,這一篇博文也是新年的第一篇,元旦假期在家花了兩天時間,自己學習自己總結。元旦假期結束後,我要繼續面試了,後面我會繼續將我面試中遇到的各種問題,總結出來,一是增加自己的知識面,二也將知識進行的傳播。
畢竟獨樂樂不眾樂樂😏。