LRU工程实现源码(一):Redis 内存淘汰策略
源码版本 Redis 6.0.0
内存淘汰是什么?什么时候内存淘汰
我们知道,当某个key被设置了过期时间之后,客户端每次对该key的访问(读写)都会事先检测该key是否过期,如果过期就直接删除;但有一些键只访问一次,因此需要主动删除,默认情况下redis每秒检测10次,检测的对象是所有设置了过期时间的键集合,每次从这个集合中随机检测20个键查看他们是否过期,如果过期就直接删除。
内存淘汰虽然也是删除key,但是与我们设置的过期key删除不同。
内存淘汰是说,如果redis已使用内存达到预设的limit值(你可以当做内存满了),如果想要继续存储新的key,那么必然要淘汰一些key来释放内存空间。具体淘汰哪些key由不同的淘汰策来决定。
由于需要淘汰掉key,所以此时redis不适合用作key-value数据库了,开启了内存淘汰的redis一般用于缓存中,因此很多人也把他叫缓存淘汰和缓存淘汰策略。
又因为内存淘汰的触发时机是内存使用达到阈值,为了存储新的key然后淘汰旧的key,因此也有人把内存淘汰策略叫做缓存替换策略
内存淘汰策略
Redis 4.0 之前一共实现了 6 种内存淘汰策略,在 4.0 之后,又增加了 2 种策略。
我们可以按照是否会进行数据淘汰把它们分成两类:
- 不进行数据淘汰的策略,只有 noeviction 这一种。
- 会进行淘汰的 7 种其他策略。
会进行淘汰的 7 种策略,我们可以再进一步根据淘汰候选数据集的范围把它们分成两类:
- 在设置了过期时间的数据中进行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 后新增)四种。
- 在所有数据范围内进行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 后新增)三种。我把这 8 种策略的分类,画到了一张图里:
这8种策略的简单描述都可以在redis.conf
文件中找到
# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
翻译下:
# MAXMEMORY POLICY:当达到最大内存时,Redis 将如何选择要删除的内容
# 到达了。 您可以从以下行为中选择一种:
#
# volatile-lru -> Evict 使用近似 LRU,只有设置了过期时间的键。
# allkeys-lru -> 使用近似 LRU 驱逐所有键。
# volatile-lfu -> 使用近似 LFU 驱逐,只有设置了过期时间的键。
# allkeys-lfu -> 使用近似 LFU 驱逐所有键。
# volatile-random -> 随机删除设置了过期时间的key。
# allkeys-random -> 随机删除一个键,任何键。
# volatile-ttl -> 删除过期时间最近的key(次TTL)
# noeviction -> 不要驱逐任何东西,只是在写操作时返回一个错误。
通过redis.conf
文件中的内容我们可以知道默认策略是noeviction
# The default is:
#
# maxmemory-policy noeviction
Redis中的LRU淘汰算法
限于篇幅,本文主要以LRU淘汰算法展开讲解,其他算法流程大同小异,可在我的redis源码中文注释项目中查看
不了解LRU算法基础的可以先去百度了解下,很简单
这里就不讲LRU算法的基本知识了,我们来看Redis的LRU算法的原理是怎么样的。
其实,LRU 算法背后的想法非常朴素:它认为刚刚被访问的数据,肯定还会被再次访问,所以就把它放在 MRU 端;长久不访问的数据,肯定就不会再被访问了,所以就让它逐渐后移到 LRU 端,在缓存满时,就优先删除它。
不过,LRU 算法在实际实现时,需要用链表管理所有的缓存数据,这会带来额外的空间开销。而且,当有数据被访问时,需要在链表上把该数据移动到 MRU 端,如果有大量数据被访问,就会带来很多链表移动操作,会很耗时,进而会降低 Redis 缓存性能。
所以,在 Redis 中,LRU 算法被做了简化,以减轻数据淘汰对缓存性能的影响。具体来说,Redis 默认会记录每个数据的最近一次访问的时间戳(由键值对数据结构 RedisObject 中的 lru 字段记录)。然后,Redis 在决定淘汰的数据时,第一次会随机选出 N 个数据,把它们作为一个候选集合。接下来,Redis 会比较这 N 个数据的 lru 字段,把 lru 字段值最小的数据从缓存中淘汰出去。
Redis 提供了一个配置参数 maxmemory-samples,这个参数就是 Redis 选出的数据个数 N。例如,我们执行如下命令,可以让 Redis 选出 100 个数据作为候选数据集:
CONFIG SET maxmemory-samples 100
redis.conf
文件中默认配置是5
# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5
当需要再次淘汰数据时,Redis 需要挑选数据进入第一次淘汰时创建的候选集合。这儿的挑选标准是:能进入候选集合的数据的 lru 字段值必须小于候选集合中最小的 lru 值。当有新数据进入候选数据集后,如果候选数据集中的数据个数达到了 maxmemory-samples,Redis 就把候选数据集中 lru 字段值最小的数据淘汰出去。
这样一来,Redis 缓存不用为所有的数据维护一个大链表,也不用在每次数据访问时都移动链表项,提升了缓存的性能。
源码剖析
源码中文注释可以在我的 github 上 的 Redis 6.0 中文注释项目里看到,欢迎访问
第一步:什么时候开始淘汰key
配置读取
struct redisServer {
// 可使用的最大内存字节数
unsigned long long maxmemory; /* Max number of memory bytes to use */
// key 淘汰策略
int maxmemory_policy; /* Policy for key eviction */
// 随机抽样精度
int maxmemory_samples; /* Precision of random sampling */
}
检查时机
执行内存淘汰的方法是freeMemoryIfNeeded
,实现在evict.c
中.
注意redis 3.0版本中该方法在
redis.c
中
这个方法主要的作用是检查内存状态,如果内存达到阈值就进行内存淘汰,释放内存空间直至到阈值以下。
打开redis源码可以很轻易搜索到该方法是被freeMemoryIfNeededAndSafe
调用的
/*
* 这是 freeMemoryIfNeeded() 的包装器,只有在现在有条件安全地执行时才真正调用该函数:
* - 超时条件下不能有脚本。
* - 现在没有加载数据。
*/
int freeMemoryIfNeededAndSafe(void) {
if (server.lua_timedout || server.loading) return C_OK;
return freeMemoryIfNeeded();
}
那么这个方法又是被谁调用的呢,有两个地方
- updateMaxmemory
- processCommand
第一个看名字是在更新Maxmemory时调用的,本文不讲,我们主要来看第二个方法
在处理命令的时候,会调用server.c
中的processCommand
方法
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
*
* 如果这个函数被调用,我们已经读取了整个命令,参数在客户端 argv/argc 字段中。
*
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* processCommand() 执行命令或准备服务器以从客户端进行批量读取。
*/
int processCommand(client *c) {
// 省略...
// 如果配置了 server.maxmemory 并且 lua脚本没有在超时条件下运行
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
// 省略...
}
// 省略...
}
getMaxmemoryState
紧接着会在freeMemoryIfNeeded
方法中检查内存状态来判断是否需要内存淘汰
/*
* 根据当前的“maxmemory”设置,定期调用此函数以查看是否有可用内存。 如果我们超过内存限制,该函数将尝试释放一些内存以返回低于限制。
*
* 如果我们低于内存限制或超过限制但尝试释放内存成功,该函数将返回 C_OK。
* 否则,如果我们超过了内存限制,但没有足够的内存被释放以返回低于限制,则该函数返回 C_ERR。
*/
int freeMemoryIfNeeded(void) {
// 默认情况下,从节点应该忽略maxmemory指令,仅仅做从节点该做的事情就好
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
// 当客户端暂停时,数据集应该是静态的,不仅来自客户端的 POV 无法写入,还有来自POV过期和驱逐key也无法执行。
if (clientsArePaused()) return C_OK;
// 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
}
getMaxmemoryState
方法也在evict.c
中:
/* 从maxmemory指令的角度获取内存状态:
* 如果使用的内存低于 maxmemory 设置,则返回 C_OK。
* 否则,如果超过内存限制,函数返回
* C_ERR。
*
* 该函数可能会通过引用返回附加信息,仅当
* 指向相应参数的指针不为 NULL。某些字段是
* 仅在返回 C_ERR 时填充:
*
* 'total' 使用的总字节数。
*(为 C_ERR 和 C_OK 填充)
*
* 'logical' 使用的内存量减去从/AOF缓冲区。
*(返回 C_ERR 时填充)
*
* 'tofree' 为了回到内存限制应该释放的内存量
*
*(返回 C_ERR 时填充)
*
* 'level' 这通常范围从 0 到 1,并报告数量
* 当前使用的内存。如果我们超过了内存限制可能会 > 1。
*(为 C_ERR 和 C_OK 填充)
*/
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)
通过以上源码,我们能够知道什么时候会内存淘汰。
下一步我们来看看会淘汰哪些key
第二步:淘汰哪些key
freeMemoryIfNeeded
文章正文中我们提到过内存淘汰策略,但是第一步并没有涉及到具体的策略。那么不同的策略是在哪里实现的呢?这是在freeMemoryIfNeeded
方法中判断的,顺便这里先带你了解下freeMemoryIfNeeded
方法的骨架
int freeMemoryIfNeeded(void) {
/*
*
* mem_reported 已使用内存
* mem_tofree 需要释放的内存
* mem_freed 已释放内存
*/
size_t mem_reported, mem_tofree, mem_freed;
// 检查内存状态,有没有超出限制,如果有,会计算需要释放的内存和已使用内存
// 这个方法不但会计算内存,还会赋值 mem_reported mem_freed
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
// 初始化已释放内存的字节数为 0
mem_freed = 0;
// 不进行内存淘汰
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
// 根据 maxmemory 策略,
// 遍历字典,释放内存并记录被释放内存的字节数
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
// 不同的策略找bestKey
}
// 最后选定的要删除的key
if (bestkey) {
// 在这里删除key
}
}
}
上面的源码省略了很多东西,但我相信你已经知道这个方法在做什么了,也应该知道不同的策略其实就是在找不同的key。我们重点分析LRU策略下的bestKey
选择。
redis索引
这个时候要去redis数据库拿东西了,查找的话还是需要索引的,因此我先向你简单罗列下Redis索引相关的数据结构,很简单,不做文字表述了
server.h
// Redis 数据库。 有多个数据库由从 0(默认数据库)到最大配置数据库的整数标识。 数据库编号是结构中的“id”字段。
typedef struct redisDb {
/* 数据库键空间,保存着数据库中的所有键值对 */
dict *dict; /* The keyspace for this DB */
// 设置超时时间的key集合
dict *expires; /* Timeout of keys with a timeout set */
// 客户端等待数据的key (BLPOP)
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
// 收到 PUSH 被阻塞的key
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
// 数据库的键的平均 TTL ,统计信息
long long avg_ttl; /* Average TTL, just for stats */
//
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
// 尝试逐一进行碎片整理的键名列表。
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
注意 redisDb 有两个重要的dict,*dict是主字典,存储所有的键集合, *expires存储设置了超时时间的集合,后面根据不同的淘汰策略就可以从不同的数据集拿数据
dict.h
/*
* 字典
*/
typedef struct dict {
// 类型特定函数
dictType *type;
// 私有数据
void *privdata;
// 哈希表
dictht ht[2];
// rehash 索引
// 当 rehash 不在进行时,值为 -1
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 当前运行的迭代器数量
unsigned long iterators; /* number of iterators currently running */
} dict;
每个dict又存储了2个dictht,为什么是两个呢?用来实现渐进式 rehash
/* 哈希表
* 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
*/
typedef struct dictht {
// 哈希表数组
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩码,用于计算索引值
// 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;
到了哈希表了,这个是真正存储元素的地方了,再来看哈希表元素长什么样
/*
* 哈希表节点
*/
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下个哈希表节点,形成链表
struct dictEntry *next;
} dictEntry;
以上就是redis索引相关的数据结构,我们可以看到redis索引是通过哈希表来实现的。上面说的是索引的存储,那么我们放入redis的value存储在哪呢?
server.h
typedef struct redisObject {
// 类型
unsigned type:4;
// 编码
unsigned encoding:4;
// 对象最后一次被访问的时间
unsigned lru:LRU_BITS;
// 引用计数
int refcount;
// 指向实际值的指针
void *ptr;
} robj;
这是我们的值对象数据结构定义,type就是具体的值的数据类型了,lru是对象最后一次被访问的时间,由于只有24位,无法记录完整的时间,因此只记录了unix time的低24位,24 bits数据要溢出的话需要194天,而缓存的数据更新非常频繁,已经足够了。详细的关于lru精度的问题可以查看源码。到这里你应该对redis的结构有了基本了解。下面在来看如何挑选bestkey更简单一些。
淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所属db
int bestdbid;
// Redis数据库
redisDb *db;
// 字典
dict *dict;
// 哈希表节点
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 淘汰池
struct evictionPoolEntry *pool = EvictionPoolLRU;
while(bestkey == NULL) {
unsigned long total_keys = 0, keys;
// 从每个数据库抽样key填充淘汰池
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
// db->dict: 数据库所有key集合
// db->expires: 数据中设置过期时间的key集合
// 判断淘汰策略是否是针对所有键的
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
// 计算字典元素数量,不为0才可以挑选key
if ((keys = dictSize(dict)) != 0) {
// 填充淘汰池,四个参数分别为 dbid,候选集合,主字典集合,淘汰池
// 填充完的淘汰池内部是有序的,按空闲时间升序
evictionPoolPopulate(i, dict, db->dict, pool);
// 已遍历检测过的key数量
total_keys += keys;
}
}
// 如果 total_keys = 0,没有要淘汰的key(redis没有key或者没有设置过期时间的key),break
if (!total_keys) break; /* No keys to evict. */
// 遍历淘汰池,从淘汰池末尾(空闲时间最长)开始向前迭代
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
// 获取当前key所属的dbid
bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
// 如果淘汰策略针对所有key,从 redisDb.dict 中取数据,redisDb.dict 指向所有的键值集合
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else { // 如果淘汰策略不是针对所有key,从 redisDb.expires 中取数据,redisDb.expires 指向已过期键值集合
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
// 从池中删除这个key,不管这个key还在不在
pool[k].key = NULL;
pool[k].idle = 0;
// 如果这个节点存在,就跳出这个循环,否则尝试下一个元素。
// 这个节点可能已经不存在了,比如到了过期时间被删除了
if (de) {
// de是key所在哈希表节点,bestkey是 key 名
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
上面的源码注释已经很清晰了,大概有如下几个步骤:
- 初始化淘汰池
- 遍历数据库
- 根据淘汰策略是所有key还是过期key,从而选择不同的数据集(redisDb.expires or redisDb.dict)
- 调用
evictionPoolPopulate
方法,传入上一步选择的数据集,填充淘汰池数据并排好序 - 从淘汰池中选取要淘汰的key(空闲时间最长的key),并且删除淘汰池中该key的引用。如果该key已经没了,那么选取淘汰池中次优的key,直到找到一个还存在的key
核心步骤当然是evictionPoolPopulate
方法,我们来来看看淘汰池的元素结构。
evict.c
// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池缓存的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255
struct evictionPoolEntry {
// 对象空闲时间
// 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
// 用来存储一个sds对象留待复用,注意我们要复用的是sds的内存空间,只需关注cached的长度(决定是否可以复用),无需关注他的内容
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};
淘汰池不只可以给LRU使用,你可以在后面的源码中看到,LFU以及TTL也会使用淘汰池
知道了淘汰池长什么样子我相信下面的代码你就好理解了,无非是按idle
字段升序排序
evict.c
/*
* 这是 freeMemoryIfNeeded() 的辅助函数,用于在每次我们想要key过期时用一些条目填充 evictionPool。
*
* 添加空闲时间小于当前所有key空闲时间的key,如果池是空的则key会一直被添加
*
* 我们按升序将键依次插入,因此空闲时间较小的键在左侧,而空闲时间较长的键在右侧。
*/
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
// 初始化抽样集合,大小为 server.maxmemory_samples
dictEntry *samples[server.maxmemory_samples];
// 此函数对字典进行采样以从随机位置返回一些键
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
// 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。
unsigned long long idle;
// key
sds key;
// 值对象
robj *o;
// 哈希表节点
dictEntry *de;
de = samples[j];
key = dictGetKey(de);
/* 如果我们采样的字典不是主字典(而是过期的字典),我们需要在键字典中再次查找键以获得值对象。*/
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}
/* 根据策略计算空闲时间。 这被称为空闲只是因为代码最初处理 LRU,但实际上只是一个分数,其中更高的分数意味着更好的候选者。*/
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
// 给定一个对象,使用近似的 LRU 算法返回未请求过该对象的最小毫秒数
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用这个池子
idle = 255-LFUDecrAndReturn(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 在这种情况下,越早过期越好。
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
/* 将元素插入池中*/
k = 0;
// 遍历淘汰池,从左边开始,找到第一个空桶或者第一个空闲时间大于等于待选元素的桶,k是该元素的坐标
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* 如果元素小于我们拥有的最差元素并且没有空桶,则无法插入。
*
* key == 0 说明上面的while循环一次也没有进入
* 要么第一个元素就是空的,要么所有已有元素的空闲时间都大于等于待插入元素的空闲时间(待插入元素比已有所有元素都优质)
* 又因为数组最后一个key不为空,因为是从左边开始插入的,所以排除了第一个元素是空的
*/
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* 插入空桶,插入前无需设置 */
} else {
/* 插入中间,现在 k 指向比要插入的元素空闲时间大的第一个元素 */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* 数组末尾有空桶,将所有元素从 k 向右移动到末尾。*/
/* 覆盖前保存 SDS */
sds cached = pool[EVPOOL_SIZE-1].cached;
// 注意这里不设置 pool[k], 只是给 pool[k] 腾位置
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
// 转移 cached (sds对象)
pool[k].cached = cached;
} else {
/* 右边没有可用空间? 在 k-1 处插入 */
k--;
/*
* 将k(包含)左侧的所有元素向左移动,因此我们丢弃空闲时间较小的元素。
*/
sds cached = pool[0].cached;
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
/*
* 尝试重用在池条目中分配的缓存 SDS 字符串,因为分配和释放此对象的成本很高
* 注意真正要复用的sds内存空间,避免重新申请内存,而不是他的值
*/
int klen = sdslen(key);
// 判断字符串长度来决定是否复用sds
if (klen > EVPOOL_CACHED_SDS_SIZE) {
// 复制一个新的 sds 字符串并赋值
pool[k].key = sdsdup(key);
} else {
/*
* 内存拷贝函数,从数据源拷贝num个字节的数据到目标数组
*
* destination:指向目标数组的指针
* source:指向数据源的指针
* num:要拷贝的字节数
*
*/
// 复用sds对象
memcpy(pool[k].cached,key,klen+1);
// 重新设置sds长度
sdssetlen(pool[k].cached,klen);
// 真正设置key
pool[k].key = pool[k].cached;
}
// 设置空闲时间
pool[k].idle = idle;
// 设置key所在db
pool[k].dbid = dbid;
}
}
至此,经过以上步骤,我们能够得到一个bestkey(LRU策略下即最长空闲时间的key),还有一个有了部分数据的淘汰池。
下一步,我们就要删除这个key来释放内存空间了。
第三步:如何删除key
// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
// 最后选定的要删除的key
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 处理过期key到从节点和 AOF 文件
// 当 master 中的 key 过期时,则将此 key 的 DEL 操作发送到所有 slaves 和 AOF 文件(如果启用)。
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
// 获取已使用内存
delta = (long long) zmalloc_used_memory();
// 是否开启lazyfree机制
// lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 计算删除key后的内存变化量
delta -= (long long) zmalloc_used_memory();
// 计算已释放内存
mem_freed += delta;
}
}
第四步:什么时候停止淘汰key
通过上面的源码我们能看到while循环条件是 while (mem_freed < mem_tofree)
,而在每一次删除key的时候,都会累加已释放key所占有的内存:
// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
if (bestkey) {
// 获取已使用内存
delta = (long long) zmalloc_used_memory();
// 是否开启lazyfree机制
// lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 计算删除key后的内存变化量
delta -= (long long) zmalloc_used_memory();
// 计算已释放内存
mem_freed += delta;
}
}
除此之外,如果配置开启了server.lazyfree_lazy_eviction
(异步淘汰),那么每淘汰一些key后还会检查一下内存状态,如果内存已经达到期望了,那么就可以手动满足循环终止条件。
// 不断循环删除key,直至释放足够的内存
while (mem_freed < mem_tofree) {
if (bestkey) {
// 获取已使用内存
delta = (long long) zmalloc_used_memory();
// 是否开启lazyfree机制
// lazyfree的原理就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 计算删除key后的内存变化量
delta -= (long long) zmalloc_used_memory();
// 计算已释放内存
mem_freed += delta;
/*
* 通常我们的停止条件是能够释放固定的、预先计算的内存量。
* 然而,当我们在另一个线程中删除对象时,最好不时检查我们是否已经到达我们的目标内存,因为“mem_freed”数量仅在 dbAsyncDelete() 调用中计算,而线程可以无时无刻释放内存
*/
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */
// 手动满足停止条件
mem_freed = mem_tofree;
}
}
}
}
最后
限于篇幅,尽可能多的放了源码,没有连贯的文字描述,如果你觉得这样看起来比较累的话,可以去我的Github看看源码注释