LRU工程實現源碼(一):Redis 記憶體淘汰策略

源碼版本 Redis 6.0.0

記憶體淘汰是什麼?什麼時候記憶體淘汰

我們知道,當某個key被設置了過期時間之後,客戶端每次對該key的訪問(讀寫)都會事先檢測該key是否過期,如果過期就直接刪除;但有一些鍵只訪問一次,因此需要主動刪除,默認情況下redis每秒檢測10次,檢測的對象是所有設置了過期時間的鍵集合,每次從這個集合中隨機檢測20個鍵查看他們是否過期,如果過期就直接刪除。

記憶體淘汰雖然也是刪除key,但是與我們設置的過期key刪除不同。

記憶體淘汰是說,如果redis已使用記憶體達到預設的limit值(你可以當做記憶體滿了),如果想要繼續存儲新的key,那麼必然要淘汰一些key來釋放記憶體空間。具體淘汰哪些key由不同的淘汰策來決定。

由於需要淘汰掉key,所以此時redis不適合用作key-value資料庫了,開啟了記憶體淘汰的redis一般用於快取中,因此很多人也把他叫快取淘汰和快取淘汰策略。

又因為記憶體淘汰的觸發時機是記憶體使用達到閾值,為了存儲新的key然後淘汰舊的key,因此也有人把記憶體淘汰策略叫做快取替換策略

記憶體淘汰策略

Redis 4.0 之前一共實現了 6 種記憶體淘汰策略,在 4.0 之後,又增加了 2 種策略。

我們可以按照是否會進行數據淘汰把它們分成兩類:

  • 不進行數據淘汰的策略,只有 noeviction 這一種。
  • 會進行淘汰的 7 種其他策略。

會進行淘汰的 7 種策略,我們可以再進一步根據淘汰候選數據集的範圍把它們分成兩類:

  • 在設置了過期時間的數據中進行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 後新增)四種。
  • 在所有數據範圍內進行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 後新增)三種。我把這 8 種策略的分類,畫到了一張圖裡:

這8種策略的簡單描述都可以在redis.conf文件中找到

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.

翻譯下:

# MAXMEMORY POLICY:當達到最大記憶體時,Redis 將如何選擇要刪除的內容
# 到達了。 您可以從以下行為中選擇一種:
#
# volatile-lru -> Evict 使用近似 LRU,只有設置了過期時間的鍵。
# allkeys-lru -> 使用近似 LRU 驅逐所有鍵。
# volatile-lfu -> 使用近似 LFU 驅逐,只有設置了過期時間的鍵。
# allkeys-lfu -> 使用近似 LFU 驅逐所有鍵。
# volatile-random -> 隨機刪除設置了過期時間的key。
# allkeys-random -> 隨機刪除一個鍵,任何鍵。
# volatile-ttl -> 刪除過期時間最近的key(次TTL)
# noeviction -> 不要驅逐任何東西,只是在寫操作時返回一個錯誤。

通過redis.conf文件中的內容我們可以知道默認策略是noeviction

# The default is:
#
# maxmemory-policy noeviction

Redis中的LRU淘汰演算法

限於篇幅,本文主要以LRU淘汰演算法展開講解,其他演算法流程大同小異,可在我的redis源碼中文注釋項目中查看

不了解LRU演算法基礎的可以先去百度了解下,很簡單

這裡就不講LRU演算法的基本知識了,我們來看Redis的LRU演算法的原理是怎麼樣的。

其實,LRU 演算法背後的想法非常樸素:它認為剛剛被訪問的數據,肯定還會被再次訪問,所以就把它放在 MRU 端;長久不訪問的數據,肯定就不會再被訪問了,所以就讓它逐漸後移到 LRU 端,在快取滿時,就優先刪除它。

不過,LRU 演算法在實際實現時,需要用鏈表管理所有的快取數據,這會帶來額外的空間開銷。而且,當有數據被訪問時,需要在鏈表上把該數據移動到 MRU 端,如果有大量數據被訪問,就會帶來很多鏈表移動操作,會很耗時,進而會降低 Redis 快取性能。

所以,在 Redis 中,LRU 演算法被做了簡化,以減輕數據淘汰對快取性能的影響。具體來說,Redis 默認會記錄每個數據的最近一次訪問的時間戳(由鍵值對數據結構 RedisObject 中的 lru 欄位記錄)。然後,Redis 在決定淘汰的數據時,第一次會隨機選出 N 個數據,把它們作為一個候選集合。接下來,Redis 會比較這 N 個數據的 lru 欄位,把 lru 欄位值最小的數據從快取中淘汰出去。

Redis 提供了一個配置參數 maxmemory-samples,這個參數就是 Redis 選出的數據個數 N。例如,我們執行如下命令,可以讓 Redis 選出 100 個數據作為候選數據集:

CONFIG SET maxmemory-samples 100

redis.conf文件中默認配置是5


# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5

當需要再次淘汰數據時,Redis 需要挑選數據進入第一次淘汰時創建的候選集合。這兒的挑選標準是:能進入候選集合的數據的 lru 欄位值必須小於候選集合中最小的 lru 值。當有新數據進入候選數據集後,如果候選數據集中的數據個數達到了 maxmemory-samples,Redis 就把候選數據集中 lru 欄位值最小的數據淘汰出去。

這樣一來,Redis 快取不用為所有的數據維護一個大鏈表,也不用在每次數據訪問時都移動鏈表項,提升了快取的性能。

源碼剖析

源碼中文注釋可以在我的 github 上 的 Redis 6.0 中文注釋項目里看到,歡迎訪問

第一步:什麼時候開始淘汰key

配置讀取
struct redisServer {
    // 可使用的最大記憶體位元組數
    unsigned long long maxmemory;   /* Max number of memory bytes to use */
    // key 淘汰策略
    int maxmemory_policy;           /* Policy for key eviction */
    // 隨機抽樣精度
    int maxmemory_samples;          /* Precision of random sampling */
}
檢查時機

執行記憶體淘汰的方法是freeMemoryIfNeeded,實現在evict.c中.

注意redis 3.0版本中該方法在redis.c

這個方法主要的作用是檢查記憶體狀態,如果記憶體達到閾值就進行記憶體淘汰,釋放記憶體空間直至到閾值以下。

打開redis源碼可以很輕易搜索到該方法是被freeMemoryIfNeededAndSafe調用的


/* 
 * 這是 freeMemoryIfNeeded() 的包裝器,只有在現在有條件安全地執行時才真正調用該函數:
 * - 超時條件下不能有腳本。
 * - 現在沒有載入數據。
 */
int freeMemoryIfNeededAndSafe(void) {
    if (server.lua_timedout || server.loading) return C_OK;
    return freeMemoryIfNeeded();
}

那麼這個方法又是被誰調用的呢,有兩個地方

  • updateMaxmemory
  • processCommand

第一個看名字是在更新Maxmemory時調用的,本文不講,我們主要來看第二個方法

在處理命令的時候,會調用server.c中的processCommand方法

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * 
 * 如果這個函數被調用,我們已經讀取了整個命令,參數在客戶端 argv/argc 欄位中。
 * 
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *  
 * processCommand() 執行命令或準備伺服器以從客戶端進行批量讀取。
 */
int processCommand(client *c) {
    // 省略...
    // 如果配置了 server.maxmemory 並且 lua腳本沒有在超時條件下運行
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        // 省略...
    }
    // 省略...
}
getMaxmemoryState

緊接著會在freeMemoryIfNeeded方法中檢查記憶體狀態來判斷是否需要記憶體淘汰

/* 
 * 根據當前的「maxmemory」設置,定期調用此函數以查看是否有可用記憶體。 如果我們超過記憶體限制,該函數將嘗試釋放一些記憶體以返回低於限制。
 *
 * 如果我們低於記憶體限制或超過限制但嘗試釋放記憶體成功,該函數將返回 C_OK。
 * 否則,如果我們超過了記憶體限制,但沒有足夠的記憶體被釋放以返回低於限制,則該函數返回 C_ERR。
 */
int freeMemoryIfNeeded(void) {
    // 默認情況下,從節點應該忽略maxmemory指令,僅僅做從節點該做的事情就好
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
    // 當客戶端暫停時,數據集應該是靜態的,不僅來自客戶端的 POV 無法寫入,還有來自POV過期和驅逐key也無法執行。
    if (clientsArePaused()) return C_OK;
    // 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
}

getMaxmemoryState 方法也在evict.c中:

/* 從maxmemory指令的角度獲取記憶體狀態:
 * 如果使用的記憶體低於 maxmemory 設置,則返回 C_OK。
 * 否則,如果超過記憶體限制,函數返回
 * C_ERR。
 *
 * 該函數可能會通過引用返回附加資訊,僅當
 * 指向相應參數的指針不為 NULL。某些欄位是
 * 僅在返回 C_ERR 時填充:
 *
 * 'total' 使用的總位元組數。
 *(為 C_ERR 和 C_OK 填充)
 *
 * 'logical' 使用的記憶體量減去從/AOF緩衝區。
 *(返回 C_ERR 時填充)
 *
 * 'tofree' 為了回到記憶體限制應該釋放的記憶體量
 * 
 *(返回 C_ERR 時填充)
 *
 * 'level' 這通常範圍從 0 到 1,並報告數量
 * 當前使用的記憶體。如果我們超過了記憶體限制可能會 > 1。
 *(為 C_ERR 和 C_OK 填充)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)

通過以上源碼,我們能夠知道什麼時候會記憶體淘汰。

下一步我們來看看會淘汰哪些key

第二步:淘汰哪些key

freeMemoryIfNeeded

文章正文中我們提到過記憶體淘汰策略,但是第一步並沒有涉及到具體的策略。那麼不同的策略是在哪裡實現的呢?這是在freeMemoryIfNeeded方法中判斷的,順便這裡先帶你了解下freeMemoryIfNeeded方法的骨架


int freeMemoryIfNeeded(void) {
    
    /*
     * 
     * mem_reported 已使用記憶體
     * mem_tofree 需要釋放的記憶體
     * mem_freed 已釋放記憶體
     */
    size_t mem_reported, mem_tofree, mem_freed;
    
    // 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
    // 這個方法不但會計算記憶體,還會賦值 mem_reported mem_freed
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
    // 初始化已釋放記憶體的位元組數為 0
    mem_freed = 0;
    
    // 不進行記憶體淘汰
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */
        
    // 根據 maxmemory 策略,
    // 遍歷字典,釋放記憶體並記錄被釋放記憶體的位元組數
    while (mem_freed < mem_tofree) {
        // 最佳淘汰key
        sds bestkey = NULL;
        // LRU策略或者LFU策略或者VOLATILE_TTL策略
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            // 不同的策略找bestKey
        }
        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            // 不同的策略找bestKey
        }
        
        // 最後選定的要刪除的key
        if (bestkey) {
            // 在這裡刪除key
        }
    }
    
}

上面的源碼省略了很多東西,但我相信你已經知道這個方法在做什麼了,也應該知道不同的策略其實就是在找不同的key。我們重點分析LRU策略下的bestKey選擇。

redis索引

這個時候要去redis資料庫拿東西了,查找的話還是需要索引的,因此我先向你簡單羅列下Redis索引相關的數據結構,很簡單,不做文字表述了

server.h

// Redis 資料庫。 有多個資料庫由從 0(默認資料庫)到最大配置資料庫的整數標識。 資料庫編號是結構中的「id」欄位。
typedef struct redisDb {
    /* 資料庫鍵空間,保存著資料庫中的所有鍵值對 */
    dict *dict;               /* The keyspace for this DB */
    // 設置超時時間的key集合
    dict *expires;              /* Timeout of keys with a timeout set */
    // 客戶端等待數據的key (BLPOP)
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    // 收到 PUSH 被阻塞的key
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    // 資料庫的鍵的平均 TTL ,統計資訊
    long long avg_ttl;          /* Average TTL, just for stats */
    // 
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    // 嘗試逐一進行碎片整理的鍵名列表。
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

注意 redisDb 有兩個重要的dict,*dict是主字典,存儲所有的鍵集合, *expires存儲設置了超時時間的集合,後面根據不同的淘汰策略就可以從不同的數據集拿數據

dict.h

/*
 * 字典
 */
typedef struct dict {
    // 類型特定函數
    dictType *type;
    // 私有數據
    void *privdata;
    // 哈希表
    dictht ht[2];
    // rehash 索引
    // 當 rehash 不在進行時,值為 -1
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // 當前運行的迭代器數量
    unsigned long iterators; /* number of iterators currently running */
} dict;

每個dict又存儲了2個dictht,為什麼是兩個呢?用來實現漸進式 rehash

/* 哈希表
 * 每個字典都使用兩個哈希表,從而實現漸進式 rehash 。
 */
typedef struct dictht {
    // 哈希表數組
    dictEntry **table;
    // 哈希表大小
    unsigned long size;
    // 哈希表大小掩碼,用於計算索引值
    // 總是等於 size - 1
    unsigned long sizemask;
    // 該哈希表已有節點的數量
    unsigned long used;
} dictht;

到了哈希表了,這個是真正存儲元素的地方了,再來看哈希表元素長什麼樣

/*
 * 哈希表節點
 */
typedef struct dictEntry {
    // 鍵
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下個哈希表節點,形成鏈表
    struct dictEntry *next;
} dictEntry;

以上就是redis索引相關的數據結構,我們可以看到redis索引是通過哈希表來實現的。上面說的是索引的存儲,那麼我們放入redis的value存儲在哪呢?

server.h

typedef struct redisObject {
    // 類型
    unsigned type:4;

    // 編碼
    unsigned encoding:4;

    // 對象最後一次被訪問的時間
    unsigned lru:LRU_BITS;

    // 引用計數
    int refcount;

    // 指向實際值的指針
    void *ptr;
} robj;

這是我們的值對象數據結構定義,type就是具體的值的數據類型了,lru是對象最後一次被訪問的時間,由於只有24位,無法記錄完整的時間,因此只記錄了unix time的低24位,24 bits數據要溢出的話需要194天,而快取的數據更新非常頻繁,已經足夠了。詳細的關於lru精度的問題可以查看源碼。到這裡你應該對redis的結構有了基本了解。下面在來看如何挑選bestkey更簡單一些。

淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所屬db
int bestdbid;
// Redis資料庫
redisDb *db;
// 字典
dict *dict;
// 哈希表節點
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
    server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
    // 淘汰池
    struct evictionPoolEntry *pool = EvictionPoolLRU;

    while(bestkey == NULL) {
        unsigned long total_keys = 0, keys;

        // 從每個資料庫抽樣key填充淘汰池
        for (i = 0; i < server.dbnum; i++) {
            db = server.db+i;
            // db->dict: 資料庫所有key集合
            // db->expires: 數據中設置過期時間的key集合
            // 判斷淘汰策略是否是針對所有鍵的
            dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                    db->dict : db->expires;
            // 計算字典元素數量,不為0才可以挑選key
            if ((keys = dictSize(dict)) != 0) {
                // 填充淘汰池,四個參數分別為 dbid,候選集合,主字典集合,淘汰池
                // 填充完的淘汰池內部是有序的,按空閑時間升序
                evictionPoolPopulate(i, dict, db->dict, pool);
                // 已遍歷檢測過的key數量
                total_keys += keys;
            }
        }
        // 如果 total_keys = 0,沒有要淘汰的key(redis沒有key或者沒有設置過期時間的key),break
        if (!total_keys) break; /* No keys to evict. */

        // 遍歷淘汰池,從淘汰池末尾(空閑時間最長)開始向前迭代
        for (k = EVPOOL_SIZE-1; k >= 0; k--) {
            if (pool[k].key == NULL) continue;
            // 獲取當前key所屬的dbid
            bestdbid = pool[k].dbid;

            if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                // 如果淘汰策略針對所有key,從 redisDb.dict 中取數據,redisDb.dict 指向所有的鍵值集合
                de = dictFind(server.db[pool[k].dbid].dict,
                    pool[k].key);
            } else { // 如果淘汰策略不是針對所有key,從 redisDb.expires 中取數據,redisDb.expires 指向已過期鍵值集合
                de = dictFind(server.db[pool[k].dbid].expires,
                    pool[k].key);
            }

           
            if (pool[k].key != pool[k].cached)
                sdsfree(pool[k].key);
            // 從池中刪除這個key,不管這個key還在不在
            pool[k].key = NULL;
            pool[k].idle = 0;

            // 如果這個節點存在,就跳出這個循環,否則嘗試下一個元素。
            // 這個節點可能已經不存在了,比如到了過期時間被刪除了
            if (de) {
                // de是key所在哈希表節點,bestkey是 key 名
                bestkey = dictGetKey(de);
                break;
            } else {
                /* Ghost... Iterate again. */
            }
        }
    }
}

上面的源碼注釋已經很清晰了,大概有如下幾個步驟:

  1. 初始化淘汰池
  2. 遍歷資料庫
  3. 根據淘汰策略是所有key還是過期key,從而選擇不同的數據集(redisDb.expires or redisDb.dict)
  4. 調用evictionPoolPopulate方法,傳入上一步選擇的數據集,填充淘汰池數據並排好序
  5. 從淘汰池中選取要淘汰的key(空閑時間最長的key),並且刪除淘汰池中該key的引用。如果該key已經沒了,那麼選取淘汰池中次優的key,直到找到一個還存在的key

核心步驟當然是evictionPoolPopulate方法,我們來來看看淘汰池的元素結構。

evict.c

// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池快取的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255

struct evictionPoolEntry {
    // 對象空閑時間
    // 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
    unsigned long long idle;    /* Object idle time (inverse frequency for LFU) */
    sds key;                    /* Key name. */
    // 用來存儲一個sds對象留待復用,注意我們要復用的是sds的記憶體空間,只需關注cached的長度(決定是否可以復用),無需關注他的內容
    sds cached;                 /* Cached SDS object for key name. */
    int dbid;                   /* Key DB number. */
};

淘汰池不只可以給LRU使用,你可以在後面的源碼中看到,LFU以及TTL也會使用淘汰池

知道了淘汰池長什麼樣子我相信下面的程式碼你就好理解了,無非是按idle欄位升序排序

evict.c

/* 
 * 這是 freeMemoryIfNeeded() 的輔助函數,用於在每次我們想要key過期時用一些條目填充 evictionPool。
 * 
 * 添加空閑時間小於當前所有key空閑時間的key,如果池是空的則key會一直被添加
 * 
 * 我們按升序將鍵依次插入,因此空閑時間較小的鍵在左側,而空閑時間較長的鍵在右側。
 */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    // 初始化抽樣集合,大小為 server.maxmemory_samples
    dictEntry *samples[server.maxmemory_samples];
    // 此函數對字典進行取樣以從隨機位置返回一些鍵
    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        // 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
        unsigned long long idle;
        // key
        sds key;
        // 值對象
        robj *o;
        // 哈希表節點
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* 如果我們取樣的字典不是主字典(而是過期的字典),我們需要在鍵字典中再次查找鍵以獲得值對象。*/
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* 根據策略計算空閑時間。 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。*/
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            // 給定一個對象,使用近似的 LRU 演算法返回未請求過該對象的最小毫秒數
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用這個池子
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            // 在這種情況下,越早過期越好。
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* 將元素插入池中*/
        k = 0;
        // 遍歷淘汰池,從左邊開始,找到第一個空桶或者第一個空閑時間大於等於待選元素的桶,k是該元素的坐標
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* 如果元素小於我們擁有的最差元素並且沒有空桶,則無法插入。
             * 
             * key == 0 說明上面的while循環一次也沒有進入
             * 要麼第一個元素就是空的,要麼所有已有元素的空閑時間都大於等於待插入元素的空閑時間(待插入元素比已有所有元素都優質)
             * 又因為數組最後一個key不為空,因為是從左邊開始插入的,所以排除了第一個元素是空的
             */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* 插入空桶,插入前無需設置 */
        } else {
            /* 插入中間,現在 k 指向比要插入的元素空閑時間大的第一個元素 */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* 數組末尾有空桶,將所有元素從 k 向右移動到末尾。*/
                /* 覆蓋前保存 SDS */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                // 注意這裡不設置 pool[k], 只是給 pool[k] 騰位置
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                // 轉移 cached (sds對象)
                pool[k].cached = cached;
            } else {
                
                /* 右邊沒有可用空間? 在 k-1 處插入 */
                k--;
                /*
                 * 將k(包含)左側的所有元素向左移動,因此我們丟棄空閑時間較小的元素。
                 */
                sds cached = pool[0].cached;
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }
        /*
         * 嘗試重用在池條目中分配的快取 SDS 字元串,因為分配和釋放此對象的成本很高
         * 注意真正要復用的sds記憶體空間,避免重新申請記憶體,而不是他的值
         */
        int klen = sdslen(key);
        // 判斷字元串長度來決定是否復用sds
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            // 複製一個新的 sds 字元串並賦值
            pool[k].key = sdsdup(key);
        } else {
            /*
             * 記憶體拷貝函數,從數據源拷貝num個位元組的數據到目標數組
             * 
             * destination:指向目標數組的指針 
             * source:指向數據源的指針 
             * num:要拷貝的位元組數
             *
             */
            // 復用sds對象
            memcpy(pool[k].cached,key,klen+1);
            // 重新設置sds長度
            sdssetlen(pool[k].cached,klen);
            // 真正設置key
            pool[k].key = pool[k].cached;
        }
        // 設置空閑時間
        pool[k].idle = idle;
        // 設置key所在db
        pool[k].dbid = dbid;
    }
}

至此,經過以上步驟,我們能夠得到一個bestkey(LRU策略下即最長空閑時間的key),還有一個有了部分數據的淘汰池。

下一步,我們就要刪除這個key來釋放記憶體空間了。

第三步:如何刪除key

// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    // 最佳淘汰key
    sds bestkey = NULL;
    // LRU策略或者LFU策略或者VOLATILE_TTL策略
    if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
        server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
    {
        // 不同的策略找bestKey
    }
    // 最後選定的要刪除的key
    if (bestkey) {
        db = server.db+bestdbid;
        robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
        // 處理過期key到從節點和 AOF 文件
        // 當 master 中的 key 過期時,則將此 key 的 DEL 操作發送到所有 slaves 和 AOF 文件(如果啟用)。
        propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
       
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
    }
}

第四步:什麼時候停止淘汰key

通過上面的源碼我們能看到while循環條件是 while (mem_freed < mem_tofree),而在每一次刪除key的時候,都會累加已釋放key所佔有的記憶體:

// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
    }
}

除此之外,如果配置開啟了server.lazyfree_lazy_eviction(非同步淘汰),那麼每淘汰一些key後還會檢查一下記憶體狀態,如果記憶體已經達到期望了,那麼就可以手動滿足循環終止條件。

// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
        /*
         * 通常我們的停止條件是能夠釋放固定的、預先計算的記憶體量。 
         * 然而,當我們在另一個執行緒中刪除對象時,最好不時檢查我們是否已經到達我們的目標記憶體,因為「mem_freed」數量僅在 dbAsyncDelete() 調用中計算,而執行緒可以無時無刻釋放記憶體
         */
        if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                /* Let's satisfy our stop condition. */
                // 手動滿足停止條件
                mem_freed = mem_tofree;
            }
        }
        
    }
    
}

最後

限於篇幅,儘可能多的放了源碼,沒有連貫的文字描述,如果你覺得這樣看起來比較累的話,可以去我的Github看看源碼注釋

Tags: