LRU工程實現源碼(一):Redis 記憶體淘汰策略
源碼版本 Redis 6.0.0
記憶體淘汰是什麼?什麼時候記憶體淘汰
我們知道,當某個key被設置了過期時間之後,客戶端每次對該key的訪問(讀寫)都會事先檢測該key是否過期,如果過期就直接刪除;但有一些鍵只訪問一次,因此需要主動刪除,默認情況下redis每秒檢測10次,檢測的對象是所有設置了過期時間的鍵集合,每次從這個集合中隨機檢測20個鍵查看他們是否過期,如果過期就直接刪除。
記憶體淘汰雖然也是刪除key,但是與我們設置的過期key刪除不同。
記憶體淘汰是說,如果redis已使用記憶體達到預設的limit值(你可以當做記憶體滿了),如果想要繼續存儲新的key,那麼必然要淘汰一些key來釋放記憶體空間。具體淘汰哪些key由不同的淘汰策來決定。
由於需要淘汰掉key,所以此時redis不適合用作key-value資料庫了,開啟了記憶體淘汰的redis一般用於快取中,因此很多人也把他叫快取淘汰和快取淘汰策略。
又因為記憶體淘汰的觸發時機是記憶體使用達到閾值,為了存儲新的key然後淘汰舊的key,因此也有人把記憶體淘汰策略叫做快取替換策略
記憶體淘汰策略
Redis 4.0 之前一共實現了 6 種記憶體淘汰策略,在 4.0 之後,又增加了 2 種策略。
我們可以按照是否會進行數據淘汰把它們分成兩類:
- 不進行數據淘汰的策略,只有 noeviction 這一種。
- 會進行淘汰的 7 種其他策略。
會進行淘汰的 7 種策略,我們可以再進一步根據淘汰候選數據集的範圍把它們分成兩類:
- 在設置了過期時間的數據中進行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 後新增)四種。
- 在所有數據範圍內進行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 後新增)三種。我把這 8 種策略的分類,畫到了一張圖裡:
這8種策略的簡單描述都可以在redis.conf
文件中找到
# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
翻譯下:
# MAXMEMORY POLICY:當達到最大記憶體時,Redis 將如何選擇要刪除的內容
# 到達了。 您可以從以下行為中選擇一種:
#
# volatile-lru -> Evict 使用近似 LRU,只有設置了過期時間的鍵。
# allkeys-lru -> 使用近似 LRU 驅逐所有鍵。
# volatile-lfu -> 使用近似 LFU 驅逐,只有設置了過期時間的鍵。
# allkeys-lfu -> 使用近似 LFU 驅逐所有鍵。
# volatile-random -> 隨機刪除設置了過期時間的key。
# allkeys-random -> 隨機刪除一個鍵,任何鍵。
# volatile-ttl -> 刪除過期時間最近的key(次TTL)
# noeviction -> 不要驅逐任何東西,只是在寫操作時返回一個錯誤。
通過redis.conf
文件中的內容我們可以知道默認策略是noeviction
# The default is:
#
# maxmemory-policy noeviction
Redis中的LRU淘汰演算法
限於篇幅,本文主要以LRU淘汰演算法展開講解,其他演算法流程大同小異,可在我的redis源碼中文注釋項目中查看
不了解LRU演算法基礎的可以先去百度了解下,很簡單
這裡就不講LRU演算法的基本知識了,我們來看Redis的LRU演算法的原理是怎麼樣的。
其實,LRU 演算法背後的想法非常樸素:它認為剛剛被訪問的數據,肯定還會被再次訪問,所以就把它放在 MRU 端;長久不訪問的數據,肯定就不會再被訪問了,所以就讓它逐漸後移到 LRU 端,在快取滿時,就優先刪除它。
不過,LRU 演算法在實際實現時,需要用鏈表管理所有的快取數據,這會帶來額外的空間開銷。而且,當有數據被訪問時,需要在鏈表上把該數據移動到 MRU 端,如果有大量數據被訪問,就會帶來很多鏈表移動操作,會很耗時,進而會降低 Redis 快取性能。
所以,在 Redis 中,LRU 演算法被做了簡化,以減輕數據淘汰對快取性能的影響。具體來說,Redis 默認會記錄每個數據的最近一次訪問的時間戳(由鍵值對數據結構 RedisObject 中的 lru 欄位記錄)。然後,Redis 在決定淘汰的數據時,第一次會隨機選出 N 個數據,把它們作為一個候選集合。接下來,Redis 會比較這 N 個數據的 lru 欄位,把 lru 欄位值最小的數據從快取中淘汰出去。
Redis 提供了一個配置參數 maxmemory-samples,這個參數就是 Redis 選出的數據個數 N。例如,我們執行如下命令,可以讓 Redis 選出 100 個數據作為候選數據集:
CONFIG SET maxmemory-samples 100
redis.conf
文件中默認配置是5
# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5
當需要再次淘汰數據時,Redis 需要挑選數據進入第一次淘汰時創建的候選集合。這兒的挑選標準是:能進入候選集合的數據的 lru 欄位值必須小於候選集合中最小的 lru 值。當有新數據進入候選數據集後,如果候選數據集中的數據個數達到了 maxmemory-samples,Redis 就把候選數據集中 lru 欄位值最小的數據淘汰出去。
這樣一來,Redis 快取不用為所有的數據維護一個大鏈表,也不用在每次數據訪問時都移動鏈表項,提升了快取的性能。
源碼剖析
源碼中文注釋可以在我的 github 上 的 Redis 6.0 中文注釋項目里看到,歡迎訪問
第一步:什麼時候開始淘汰key
配置讀取
struct redisServer {
// 可使用的最大記憶體位元組數
unsigned long long maxmemory; /* Max number of memory bytes to use */
// key 淘汰策略
int maxmemory_policy; /* Policy for key eviction */
// 隨機抽樣精度
int maxmemory_samples; /* Precision of random sampling */
}
檢查時機
執行記憶體淘汰的方法是freeMemoryIfNeeded
,實現在evict.c
中.
注意redis 3.0版本中該方法在
redis.c
中
這個方法主要的作用是檢查記憶體狀態,如果記憶體達到閾值就進行記憶體淘汰,釋放記憶體空間直至到閾值以下。
打開redis源碼可以很輕易搜索到該方法是被freeMemoryIfNeededAndSafe
調用的
/*
* 這是 freeMemoryIfNeeded() 的包裝器,只有在現在有條件安全地執行時才真正調用該函數:
* - 超時條件下不能有腳本。
* - 現在沒有載入數據。
*/
int freeMemoryIfNeededAndSafe(void) {
if (server.lua_timedout || server.loading) return C_OK;
return freeMemoryIfNeeded();
}
那麼這個方法又是被誰調用的呢,有兩個地方
- updateMaxmemory
- processCommand
第一個看名字是在更新Maxmemory時調用的,本文不講,我們主要來看第二個方法
在處理命令的時候,會調用server.c
中的processCommand
方法
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
*
* 如果這個函數被調用,我們已經讀取了整個命令,參數在客戶端 argv/argc 欄位中。
*
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* processCommand() 執行命令或準備伺服器以從客戶端進行批量讀取。
*/
int processCommand(client *c) {
// 省略...
// 如果配置了 server.maxmemory 並且 lua腳本沒有在超時條件下運行
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
// 省略...
}
// 省略...
}
getMaxmemoryState
緊接著會在freeMemoryIfNeeded
方法中檢查記憶體狀態來判斷是否需要記憶體淘汰
/*
* 根據當前的「maxmemory」設置,定期調用此函數以查看是否有可用記憶體。 如果我們超過記憶體限制,該函數將嘗試釋放一些記憶體以返回低於限制。
*
* 如果我們低於記憶體限制或超過限制但嘗試釋放記憶體成功,該函數將返回 C_OK。
* 否則,如果我們超過了記憶體限制,但沒有足夠的記憶體被釋放以返回低於限制,則該函數返回 C_ERR。
*/
int freeMemoryIfNeeded(void) {
// 默認情況下,從節點應該忽略maxmemory指令,僅僅做從節點該做的事情就好
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
// 當客戶端暫停時,數據集應該是靜態的,不僅來自客戶端的 POV 無法寫入,還有來自POV過期和驅逐key也無法執行。
if (clientsArePaused()) return C_OK;
// 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
}
getMaxmemoryState
方法也在evict.c
中:
/* 從maxmemory指令的角度獲取記憶體狀態:
* 如果使用的記憶體低於 maxmemory 設置,則返回 C_OK。
* 否則,如果超過記憶體限制,函數返回
* C_ERR。
*
* 該函數可能會通過引用返回附加資訊,僅當
* 指向相應參數的指針不為 NULL。某些欄位是
* 僅在返回 C_ERR 時填充:
*
* 'total' 使用的總位元組數。
*(為 C_ERR 和 C_OK 填充)
*
* 'logical' 使用的記憶體量減去從/AOF緩衝區。
*(返回 C_ERR 時填充)
*
* 'tofree' 為了回到記憶體限制應該釋放的記憶體量
*
*(返回 C_ERR 時填充)
*
* 'level' 這通常範圍從 0 到 1,並報告數量
* 當前使用的記憶體。如果我們超過了記憶體限制可能會 > 1。
*(為 C_ERR 和 C_OK 填充)
*/
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)
通過以上源碼,我們能夠知道什麼時候會記憶體淘汰。
下一步我們來看看會淘汰哪些key
第二步:淘汰哪些key
freeMemoryIfNeeded
文章正文中我們提到過記憶體淘汰策略,但是第一步並沒有涉及到具體的策略。那麼不同的策略是在哪裡實現的呢?這是在freeMemoryIfNeeded
方法中判斷的,順便這裡先帶你了解下freeMemoryIfNeeded
方法的骨架
int freeMemoryIfNeeded(void) {
/*
*
* mem_reported 已使用記憶體
* mem_tofree 需要釋放的記憶體
* mem_freed 已釋放記憶體
*/
size_t mem_reported, mem_tofree, mem_freed;
// 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
// 這個方法不但會計算記憶體,還會賦值 mem_reported mem_freed
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
// 初始化已釋放記憶體的位元組數為 0
mem_freed = 0;
// 不進行記憶體淘汰
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
// 根據 maxmemory 策略,
// 遍歷字典,釋放記憶體並記錄被釋放記憶體的位元組數
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
// 不同的策略找bestKey
}
// 最後選定的要刪除的key
if (bestkey) {
// 在這裡刪除key
}
}
}
上面的源碼省略了很多東西,但我相信你已經知道這個方法在做什麼了,也應該知道不同的策略其實就是在找不同的key。我們重點分析LRU策略下的bestKey
選擇。
redis索引
這個時候要去redis資料庫拿東西了,查找的話還是需要索引的,因此我先向你簡單羅列下Redis索引相關的數據結構,很簡單,不做文字表述了
server.h
// Redis 資料庫。 有多個資料庫由從 0(默認資料庫)到最大配置資料庫的整數標識。 資料庫編號是結構中的「id」欄位。
typedef struct redisDb {
/* 資料庫鍵空間,保存著資料庫中的所有鍵值對 */
dict *dict; /* The keyspace for this DB */
// 設置超時時間的key集合
dict *expires; /* Timeout of keys with a timeout set */
// 客戶端等待數據的key (BLPOP)
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
// 收到 PUSH 被阻塞的key
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
// 資料庫的鍵的平均 TTL ,統計資訊
long long avg_ttl; /* Average TTL, just for stats */
//
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
// 嘗試逐一進行碎片整理的鍵名列表。
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
注意 redisDb 有兩個重要的dict,*dict是主字典,存儲所有的鍵集合, *expires存儲設置了超時時間的集合,後面根據不同的淘汰策略就可以從不同的數據集拿數據
dict.h
/*
* 字典
*/
typedef struct dict {
// 類型特定函數
dictType *type;
// 私有數據
void *privdata;
// 哈希表
dictht ht[2];
// rehash 索引
// 當 rehash 不在進行時,值為 -1
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 當前運行的迭代器數量
unsigned long iterators; /* number of iterators currently running */
} dict;
每個dict又存儲了2個dictht,為什麼是兩個呢?用來實現漸進式 rehash
/* 哈希表
* 每個字典都使用兩個哈希表,從而實現漸進式 rehash 。
*/
typedef struct dictht {
// 哈希表數組
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩碼,用於計算索引值
// 總是等於 size - 1
unsigned long sizemask;
// 該哈希表已有節點的數量
unsigned long used;
} dictht;
到了哈希表了,這個是真正存儲元素的地方了,再來看哈希表元素長什麼樣
/*
* 哈希表節點
*/
typedef struct dictEntry {
// 鍵
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下個哈希表節點,形成鏈表
struct dictEntry *next;
} dictEntry;
以上就是redis索引相關的數據結構,我們可以看到redis索引是通過哈希表來實現的。上面說的是索引的存儲,那麼我們放入redis的value存儲在哪呢?
server.h
typedef struct redisObject {
// 類型
unsigned type:4;
// 編碼
unsigned encoding:4;
// 對象最後一次被訪問的時間
unsigned lru:LRU_BITS;
// 引用計數
int refcount;
// 指向實際值的指針
void *ptr;
} robj;
這是我們的值對象數據結構定義,type就是具體的值的數據類型了,lru是對象最後一次被訪問的時間,由於只有24位,無法記錄完整的時間,因此只記錄了unix time的低24位,24 bits數據要溢出的話需要194天,而快取的數據更新非常頻繁,已經足夠了。詳細的關於lru精度的問題可以查看源碼。到這裡你應該對redis的結構有了基本了解。下面在來看如何挑選bestkey更簡單一些。
淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所屬db
int bestdbid;
// Redis資料庫
redisDb *db;
// 字典
dict *dict;
// 哈希表節點
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 淘汰池
struct evictionPoolEntry *pool = EvictionPoolLRU;
while(bestkey == NULL) {
unsigned long total_keys = 0, keys;
// 從每個資料庫抽樣key填充淘汰池
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
// db->dict: 資料庫所有key集合
// db->expires: 數據中設置過期時間的key集合
// 判斷淘汰策略是否是針對所有鍵的
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
// 計算字典元素數量,不為0才可以挑選key
if ((keys = dictSize(dict)) != 0) {
// 填充淘汰池,四個參數分別為 dbid,候選集合,主字典集合,淘汰池
// 填充完的淘汰池內部是有序的,按空閑時間升序
evictionPoolPopulate(i, dict, db->dict, pool);
// 已遍歷檢測過的key數量
total_keys += keys;
}
}
// 如果 total_keys = 0,沒有要淘汰的key(redis沒有key或者沒有設置過期時間的key),break
if (!total_keys) break; /* No keys to evict. */
// 遍歷淘汰池,從淘汰池末尾(空閑時間最長)開始向前迭代
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
// 獲取當前key所屬的dbid
bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
// 如果淘汰策略針對所有key,從 redisDb.dict 中取數據,redisDb.dict 指向所有的鍵值集合
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else { // 如果淘汰策略不是針對所有key,從 redisDb.expires 中取數據,redisDb.expires 指向已過期鍵值集合
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
// 從池中刪除這個key,不管這個key還在不在
pool[k].key = NULL;
pool[k].idle = 0;
// 如果這個節點存在,就跳出這個循環,否則嘗試下一個元素。
// 這個節點可能已經不存在了,比如到了過期時間被刪除了
if (de) {
// de是key所在哈希表節點,bestkey是 key 名
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
上面的源碼注釋已經很清晰了,大概有如下幾個步驟:
- 初始化淘汰池
- 遍歷資料庫
- 根據淘汰策略是所有key還是過期key,從而選擇不同的數據集(redisDb.expires or redisDb.dict)
- 調用
evictionPoolPopulate
方法,傳入上一步選擇的數據集,填充淘汰池數據並排好序 - 從淘汰池中選取要淘汰的key(空閑時間最長的key),並且刪除淘汰池中該key的引用。如果該key已經沒了,那麼選取淘汰池中次優的key,直到找到一個還存在的key
核心步驟當然是evictionPoolPopulate
方法,我們來來看看淘汰池的元素結構。
evict.c
// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池快取的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255
struct evictionPoolEntry {
// 對象空閑時間
// 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
// 用來存儲一個sds對象留待復用,注意我們要復用的是sds的記憶體空間,只需關注cached的長度(決定是否可以復用),無需關注他的內容
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};
淘汰池不只可以給LRU使用,你可以在後面的源碼中看到,LFU以及TTL也會使用淘汰池
知道了淘汰池長什麼樣子我相信下面的程式碼你就好理解了,無非是按idle
欄位升序排序
evict.c
/*
* 這是 freeMemoryIfNeeded() 的輔助函數,用於在每次我們想要key過期時用一些條目填充 evictionPool。
*
* 添加空閑時間小於當前所有key空閑時間的key,如果池是空的則key會一直被添加
*
* 我們按升序將鍵依次插入,因此空閑時間較小的鍵在左側,而空閑時間較長的鍵在右側。
*/
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
// 初始化抽樣集合,大小為 server.maxmemory_samples
dictEntry *samples[server.maxmemory_samples];
// 此函數對字典進行取樣以從隨機位置返回一些鍵
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
// 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
unsigned long long idle;
// key
sds key;
// 值對象
robj *o;
// 哈希表節點
dictEntry *de;
de = samples[j];
key = dictGetKey(de);
/* 如果我們取樣的字典不是主字典(而是過期的字典),我們需要在鍵字典中再次查找鍵以獲得值對象。*/
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}
/* 根據策略計算空閑時間。 這被稱為空閑只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。*/
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
// 給定一個對象,使用近似的 LRU 演算法返回未請求過該對象的最小毫秒數
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用這個池子
idle = 255-LFUDecrAndReturn(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 在這種情況下,越早過期越好。
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
/* 將元素插入池中*/
k = 0;
// 遍歷淘汰池,從左邊開始,找到第一個空桶或者第一個空閑時間大於等於待選元素的桶,k是該元素的坐標
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* 如果元素小於我們擁有的最差元素並且沒有空桶,則無法插入。
*
* key == 0 說明上面的while循環一次也沒有進入
* 要麼第一個元素就是空的,要麼所有已有元素的空閑時間都大於等於待插入元素的空閑時間(待插入元素比已有所有元素都優質)
* 又因為數組最後一個key不為空,因為是從左邊開始插入的,所以排除了第一個元素是空的
*/
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* 插入空桶,插入前無需設置 */
} else {
/* 插入中間,現在 k 指向比要插入的元素空閑時間大的第一個元素 */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* 數組末尾有空桶,將所有元素從 k 向右移動到末尾。*/
/* 覆蓋前保存 SDS */
sds cached = pool[EVPOOL_SIZE-1].cached;
// 注意這裡不設置 pool[k], 只是給 pool[k] 騰位置
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
// 轉移 cached (sds對象)
pool[k].cached = cached;
} else {
/* 右邊沒有可用空間? 在 k-1 處插入 */
k--;
/*
* 將k(包含)左側的所有元素向左移動,因此我們丟棄空閑時間較小的元素。
*/
sds cached = pool[0].cached;
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
/*
* 嘗試重用在池條目中分配的快取 SDS 字元串,因為分配和釋放此對象的成本很高
* 注意真正要復用的sds記憶體空間,避免重新申請記憶體,而不是他的值
*/
int klen = sdslen(key);
// 判斷字元串長度來決定是否復用sds
if (klen > EVPOOL_CACHED_SDS_SIZE) {
// 複製一個新的 sds 字元串並賦值
pool[k].key = sdsdup(key);
} else {
/*
* 記憶體拷貝函數,從數據源拷貝num個位元組的數據到目標數組
*
* destination:指向目標數組的指針
* source:指向數據源的指針
* num:要拷貝的位元組數
*
*/
// 復用sds對象
memcpy(pool[k].cached,key,klen+1);
// 重新設置sds長度
sdssetlen(pool[k].cached,klen);
// 真正設置key
pool[k].key = pool[k].cached;
}
// 設置空閑時間
pool[k].idle = idle;
// 設置key所在db
pool[k].dbid = dbid;
}
}
至此,經過以上步驟,我們能夠得到一個bestkey(LRU策略下即最長空閑時間的key),還有一個有了部分數據的淘汰池。
下一步,我們就要刪除這個key來釋放記憶體空間了。
第三步:如何刪除key
// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
// 最後選定的要刪除的key
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 處理過期key到從節點和 AOF 文件
// 當 master 中的 key 過期時,則將此 key 的 DEL 操作發送到所有 slaves 和 AOF 文件(如果啟用)。
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
}
}
第四步:什麼時候停止淘汰key
通過上面的源碼我們能看到while循環條件是 while (mem_freed < mem_tofree)
,而在每一次刪除key的時候,都會累加已釋放key所佔有的記憶體:
// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
if (bestkey) {
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
}
}
除此之外,如果配置開啟了server.lazyfree_lazy_eviction
(非同步淘汰),那麼每淘汰一些key後還會檢查一下記憶體狀態,如果記憶體已經達到期望了,那麼就可以手動滿足循環終止條件。
// 不斷循環刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
if (bestkey) {
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後台,讓後台執行緒去執行真正的destruct,避免由於對象體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
/*
* 通常我們的停止條件是能夠釋放固定的、預先計算的記憶體量。
* 然而,當我們在另一個執行緒中刪除對象時,最好不時檢查我們是否已經到達我們的目標記憶體,因為「mem_freed」數量僅在 dbAsyncDelete() 調用中計算,而執行緒可以無時無刻釋放記憶體
*/
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */
// 手動滿足停止條件
mem_freed = mem_tofree;
}
}
}
}
最後
限於篇幅,儘可能多的放了源碼,沒有連貫的文字描述,如果你覺得這樣看起來比較累的話,可以去我的Github看看源碼注釋