淺析Redis基礎數據結構

Redis是一種記憶體資料庫,所以可以很方便的直接基於記憶體中的數據結構,對外提供眾多的介面,而這些介面實際上就是對不同的數據結構進行操作的演算法,首先redis本身是一種key-value的資料庫,對於value常見的類型有:

字元串(string)、散列(hash)、列表(list)、集合(set)、排序集合(sorted set)、點陣圖(bitmaps)、地理空間索引(Geospatial indexes)、流(streams)

1.全局哈希表實現

key-value是redis中最基礎的結構,key-value是採用哈希表(hash table)這種基礎的數據結構來實現的,其中key是字元串類型,而value則會有上面說的各種數據類型。

哈希表是由基礎的哈希函數和數組來構成了,哈希函數採用的SipHash演算法,數組本身無法存儲多種類型的數據,所以數組元素本身是一個指針,指向具體的元素(entry),這個entry又存儲了key和value的地址,具體value也是也是一個比較複雜的數據結構,整個key-value我們可以稱為全局哈希表,如下圖:

image-20220317172544945

通常情況下哈希表查找的平均時間複雜度是O(1),所以在Redis中按照key來查找元素的複雜度也是O(1),所以Redis對於大量的key也能保持較高的性能,但是保持高性能的前提是哈希衝突的情況比較少,隨著數組不斷被填滿,哈希衝突的概率會不斷提高,所以需要和普通的哈希表一樣進行擴容,這個過程叫做rehash,rehash過程需要大量的數據搬遷工作,由於Redis是採用單執行緒的模型,假如要搬遷的元素過多會佔用很多的CPU時間,從而導致長時間阻塞其他請求的執行,所以普通哈希表存在的問題在Redis中都會遇到,有兩種情況會導致Redis性能的降低:

  1. 哈希衝突
  2. 擴容搬遷

Redis解決哈希衝突採用的辦法也是鏈表法,這時候數組元素指針指向的是鏈表的頭指針,當鏈表中元素個數過多時就會執行擴容,參考:

// 來源:
// //github.com/redis/redis/blob/5.0/src/dict.h
// //github.com/redis/redis/blob/5.0/src/dict.c

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

// 字典類型定義
typedef struct dictType {
    uint64_t (*hashFunction)(const void *key);
    void *(*keyDup)(dict *d, const void *key);
    void *(*valDup)(dict *d, const void *obj);
    int (*keyCompare)(dict *d, const void *key1, const void *key2);
    void (*keyDestructor)(dict *d, void *key);
    void (*valDestructor)(dict *d, void *obj);
    int (*expandAllowed)(size_t moreMem, double usedRatio);
    /* Allow a dictEntry to carry extra caller-defined metadata.  The
     * extra memory is initialized to 0 when a dictEntry is allocated. */
    size_t (*dictEntryMetadataBytes)(dict *d);
} dictType;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

// hash類型定義
typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    // -1表示沒有運行rehash
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

int dictRehash(dict *d, int n) {
    // 空桶間隔
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        // 搬當前嘈的整個鏈表
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}

#define dictIsRehashing(d) ((d)->rehashidx != -1)

/* Add or Overwrite:
 * Add an element, discarding the old value if the key already exists.
 * Return 1 if the key was added from scratch, 0 if there was already an
 * element with such key and dictReplace() just performed a value update
 * operation. */
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, *existing, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }

    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}

/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    // 如果正在執行rehash 則執行漸進式擴容 
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // 查詢下標索引
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    // 同時查詢兩個哈希表
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_SIZE     4

static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;

/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    // 初始大小為4
    /* If the hash table is empty expand it to the initial size. */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    // 判斷是否達到擴容條件 如果放置的元素和大小已經相等 且 dict_can_resize為1或者達到強制擴容閾值5
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    uint64_t h, idx, table;

    if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

Redis dict的整體實現如下:

image-20220319160633779

其中擴容是在添加元素時進行,dict結構體的定義中重點是dictht類型的ht數組和rehashidx,ht數組有兩個元素,每個元素其實就是1個哈希表,默認先往ht[0]當中寫入,當ht[0]比較滿的時候,會觸發擴容操作,然後逐漸將元素搬遷是ht[1],當完全完成遷移時再將ht[0]釋放掉,並且將ht[1]賦值給ht[0],將ht[1]變為初始狀態,則擴容完畢,而rehashidx變數則是1個非常重要的變數,用來表示擴容的進度,當擴容完畢後,rehashidx的值會置為-1。

首先擴容操作發生在向Redis添加元素的過程中,這時會調用dictAdd函數,裡面又繼續調用了dictAddRaw函數,函數內首先判斷當前是否正在執行rehash操作,如果正在rehash會進行漸進式數據搬遷,然後繼續向下走會調用_dictKeyIndex函數查詢當前的key所對應的數組下標索引,判斷key是否已經存在,如果不存在則進行寫入,然後判斷當前是否在rehash,如果是則會使用新的哈希表ht[1],否則默認寫入ht[0],首先會創建當前entry所用的空間,然後將已有的鏈表掛到當前entry的尾部,最後將entry賦值給數組,所以明顯是採用頭插方式,因為Redis認為剛寫入的數據會更容易被訪問到,因此會放到頭部。

上面的過程是操作的主線,然後看下擴容的條件,在調用_dictKeyIndex查詢下標時,執行了_dictExpandIfNeeded函數,這個函數就是具體來執行擴容的,可以看到初始情況下如果ht[0]的size屬性為0,則會進行初始化,大小為DICT_HT_INITIAL_SIZE,這個值默認是4,如果已經初始化的話,會判斷是否達到擴容的條件,如果當前放置的元素數量和數組長度相等,說明數組已經比較滿了,並且後面兩個條件滿足1個那麼就可以進行擴容,首先是dict_can_resize這個值默認為0,表示能否進行擴容,在Redis中存在配置好的定時任務,其中包括rdb和aof持久化,在持久化執行的時候,會調用dictDisableResize將這個值設置為0,原因是此時正在執行持久化盡量不要擴容帶來額外的消耗,那麼另外還有1個強制擴容閾值dict_force_resize_ratio默認是5,也就是說當實際元素數量超過數組長度5倍時,無論如何都會進行擴容,否則redis的性能將會急劇下降,擴容的大小是之前數組大小的2倍。

然後就是漸進式擴容,當插入或查找元素時,都會判斷當前是否在rehash,如果正在rehash過程中則執行_dictRehashStep函數,其中如果當前沒有迭代查詢的操作則執行dictRehash,裡面會將步長乘以10,表示如果桶的元素為NULL每次最多檢查10個桶,同時會移動rehashidx指針,如果10個桶都沒有數據則直接退出,如果有一個桶有數據就只搬這1個桶然後就退出,下一次操作時再繼續執行,如果存在數據則會將當前桶的鏈表頭到ht[1]對應的鏈表尾部,同時將ht[0]置空,最後如果發現數據全部搬完則會用新的ht[1]替換ht[0],然後釋放ht[1],繼續恢復原有的狀態就完成了擴容。

最後可能會考慮到一個問題是,如果Redis長時間沒有讀寫操作,那麼rehash豈不是永遠不會完成,其實Redis也會有定時任務來執行rehash操作,在server.c中可以找到serverCron函數,這個函數按照特定的時鐘周期被觸發,默認的server.hz為10,從Redis 5.0開始可以根據客戶端的負載自動調整時鐘周期,在serverCron函數中會調用databasesCron函數,當不執行rdb和aof持久化的時候則會執行rehash,具體程式碼段參考:

    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        /* We use global counters so if we stop the computation at a given
         * DB we'll be able to start from the successive in the next
         * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;

        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }

可以看到會先判斷activerehashing配置是否開啟,如果開啟會挨個掃描每個db依次執行incrementallyRehash函數:

int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}

然後會繼續調用dictRehashMilliseconds函數意思是花1ms的時間執行rehash,這個函數又回到了dict.c中:

/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

這時候步長變成100,也就是每次最多掃描1000個桶,如果運行時間大於1ms則退出去,這個就是在沒有用戶操作時後台rehash的過程。

所以可以看到,採用漸進式擴容的方式其實是電腦中的攤銷思想,可以將整個哈希表遷移的時間均攤到每次操作中,這樣每次操作的速度幾乎不受影響,仍然可以保持高性能的讀寫。

2.value類型的實現

image-20220318171536211

2.1 List類型

String的實現只有簡單動態字元串這一種,簡稱sds,這塊沒有什麼特殊的操作,然後是List,上圖是指向雙向鏈表和壓縮列表,雙向鏈表比較好理解,關鍵是壓縮列表(ziplist),壓縮列表的定義在ziplist.c中,注釋講的比較詳細,大致內容如下:

image-20220322143154725

其中zlbytes是壓縮列表的位元組數,包括本身的4個位元組,zltail是壓縮列表最後1個entry的偏移量,本身佔用4個位元組,設置這個的好處是可以直接從尾部彈出元素而無需遍歷壓縮列表,然後zllen表示壓縮列表中的entry數量,本身佔用2個位元組,也就是最多放2^16 – 2個條目,當超過這個條目時設置為2^16 – 1,這時候需要遍歷才能知道多少個entry,正常是不會超過的,entry的組成下面說,最後是zlend表示ziplist的結尾,長度1個位元組,固定為0xff

那麼對於每個entry則編碼方式為:image-20220322144433010

prevlen表示上一個entry的長度,這樣可以方便向前查找,prevlen的長度是不固定的,參考程式碼:

/* Encode the length of the previous entry and write it to "p". This only
 * uses the larger encoding (required in __ziplistCascadeUpdate). */
int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
    if (p != NULL) {
        p[0] = ZIP_BIG_PREVLEN;
        memcpy(p+1,&len,sizeof(len));
        memrev32ifbe(p+1);
    }
    return 1+sizeof(len);
}

/* Encode the length of the previous entry and write it to "p". Return the
 * number of bytes needed to encode this length if "p" is NULL. */
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
    if (p == NULL) {
        return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(len)+1;
    } else {
        if (len < ZIP_BIG_PREVLEN) {
            p[0] = len;
            return 1;
        } else {
            return zipStorePrevEntryLengthLarge(p,len);
        }
    }
}

這兩個函數在對ziplist做操作時都會調用到用來獲取上一個entry的長度,其中p表示當前entry的指針,len是上一個entry的長度,如果p是NULL僅僅返回編碼這個長度的內容所需要的位元組數,其中ZIP_BIG_PREVLEN定義為254,也就是說當長度小於254時使用1個位元組編碼,反過來當大於254時將會調用zipStorePrevEntryLengthLarge,第一個位元組固定為254並且後面4個位元組表示長度,也就是說prevlen此時佔用5個位元組。

然後看下encoding,也就是編碼資訊,寫入的程式碼如下:

unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
    unsigned char len = 1, buf[5];

    if (ZIP_IS_STR(encoding)) {
        /* Although encoding is given it may not be set for strings,
         * so we determine it here using the raw length. */
        if (rawlen <= 0x3f) {
            if (!p) return len;
            buf[0] = ZIP_STR_06B | rawlen;
        } else if (rawlen <= 0x3fff) {
            len += 1;
            if (!p) return len;
            buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
            buf[1] = rawlen & 0xff;
        } else {
            len += 4;
            if (!p) return len;
            buf[0] = ZIP_STR_32B;
            buf[1] = (rawlen >> 24) & 0xff;
            buf[2] = (rawlen >> 16) & 0xff;
            buf[3] = (rawlen >> 8) & 0xff;
            buf[4] = rawlen & 0xff;
        }
    } else {
        /* Implies integer encoding, so length is always 1. */
        if (!p) return len;
        buf[0] = encoding;
    }

    /* Store this length at p. */
    memcpy(p,buf,len);
    return len;
}

如果entry的值為整數,那麼encoding只佔用1個位元組,不同的值表示不同的整數類型,具體由下面的宏來定義:

/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0
#define ZIP_INT_MASK 0x30
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)
// entry值為整數時的表示
#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe

否則如果entry數據的長度小於等於63,也是佔用1個位元組,大於63並且小於16383則佔用2個位元組,如果大於16383則佔用5個位元組,第1個位元組為128,後續4個位元組從程式碼可以看出來就是數據本身的長度。

那麼ziplist的最大大小是多少呢?是由如下的宏進行定義:

#define SIZE_SAFETY_LIMIT 8192

可以看到ziplist的最大長度是8k,具體的分析過程這裡先不再詳細說,先看下list的整體結構:

// list由quicklist實現 定義在quicklist.h中
typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : 16;              /* fill factor for individual nodes */
    unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

整個list結構定義是quicklist結構體,其中包括頭節點和尾節點的指針,以及node和entry總的計數,其中fill表示ziplist的填充因子,compress表示不壓縮的深度,0表示關閉壓縮。

然後每一個node的定義在下面,由於是雙向鏈表所以包含前驅節點和後繼節點的指針,然後zl就是一個壓縮列表,sz表示壓縮列表的大小,count表示ziplist中的entry個數,encoding默認是1,如果開啟LZF壓縮則是2,container的值默認是2,其餘的默認都是0。

所以根據結構體的定義,我們可以簡單得出下面的圖:

image-20220323111016057

然後我們分析下寫入的過程,首先寫入是在t_list.c中傳入執行:

#define COMPRESS_MAX (1 << 16)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
    if (compress > COMPRESS_MAX) {
        compress = COMPRESS_MAX;
    } else if (compress < 0) {
        compress = 0;
    }
    quicklist->compress = compress;
}

#define FILL_MAX (1 << 15)
void quicklistSetFill(quicklist *quicklist, int fill) {
    if (fill > FILL_MAX) {
        fill = FILL_MAX;
    } else if (fill < -5) {
        fill = -5;
    }
    quicklist->fill = fill;
}

void quicklistSetOptions(quicklist *quicklist, int fill, int depth) {
    quicklistSetFill(quicklist, fill);
    quicklistSetCompressDepth(quicklist, depth);
}

void lpushCommand(client *c) {
    pushGenericCommand(c,LIST_HEAD);
}


// 命令入口
void rpushCommand(client *c) {
    pushGenericCommand(c,LIST_TAIL);
}

void pushGenericCommand(client *c, int where) {
    int j, pushed = 0;

    for (j = 2; j < c->argc; j++) {
        if (sdslen(c->argv[j]->ptr) > LIST_MAX_ITEM_SIZE) {
            addReplyError(c, "Element too large");
            return;
        }
    }

    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    if (lobj && lobj->type != OBJ_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        if (!lobj) {
            // 調用object.c中的函數創建object
            lobj = createQuicklistObject();
            // 設置ziplist最大大小和非壓縮深度
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,c->argv[1],lobj);
        }
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }
    addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
    if (pushed) {
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}

// object.c createQuicklistObject
robj *createQuicklistObject(void) {
    quicklist *l = quicklistCreate();   // quicklist.c quicklistCreate
    robj *o = createObject(OBJ_LIST,l);
    o->encoding = OBJ_ENCODING_QUICKLIST;
    return o;
}
// quicklist.c quicklistCreate
quicklist *quicklistCreate(void) {
    struct quicklist *quicklist;

    quicklist = zmalloc(sizeof(*quicklist));
    quicklist->head = quicklist->tail = NULL;
    quicklist->len = 0;
    quicklist->count = 0;
    quicklist->compress = 0;
    quicklist->fill = -2;
    return quicklist;
}

void listTypePush(robj *subject, robj *value, int where) {
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
        value = getDecodedObject(value);
        size_t len = sdslen(value->ptr);
        quicklistPush(subject->ptr, value->ptr, len, pos);
        decrRefCount(value);
    } else {
        serverPanic("Unknown list encoding");
    }
}

插入可以從左端和右端,左端就是從鏈表頭部插入,右端則是從鏈表尾部插入,最終都會帶著條件調用pushGenericCommand函數,過程同樣是先lookupKey從全局哈希表中看看key是否存在,如果不存在則首先創建整個的quicklist結構,然後通過dbAdd寫入db的全局哈希表中,最終還是調用了前面的dictAdd函數,其中quick list創建比較簡單,只是設置了一些默認值,然後會調用quicklistSetOptions函數設置ziplist的最大大小和非壓縮深度,這兩個參數都是通過Redis的配置文件傳入,默認定義為:

/* List defaults */
#define OBJ_LIST_MAX_ZIPLIST_SIZE -2
#define OBJ_LIST_COMPRESS_DEPTH 0

這裡-2相當於一個檔次,範圍有5檔,是-5~-1,另外也可以使用真實的大小傳入,在quicklistSetFill中寫的很清楚,如果大小大於32768那麼就設置為32768,如果小於-5,則設置為-5,這個負數的意思等下會說。

然後就來到了listTypePush這個函數,將value轉成sds,然後調用quicklistPush函數插入到雙向鏈表中:

/* Wrapper to allow argument-based switching between HEAD/TAIL pop */
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
                   int where) {
    if (where == QUICKLIST_HEAD) {
        quicklistPushHead(quicklist, value, sz);
    } else if (where == QUICKLIST_TAIL) {
        quicklistPushTail(quicklist, value, sz);
    }
}

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        quicklistNodeUpdateSz(node);
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}

int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_tail = quicklist->tail;
    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
    if (likely(
            _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
        quicklist->tail->zl =
            ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
        quicklistNodeUpdateSz(quicklist->tail);
    } else {
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);

        quicklistNodeUpdateSz(node);
        _quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
    }
    quicklist->count++;
    quicklist->tail->count++;
    return (orig_tail != quicklist->tail);
}

REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
    quicklistNode *node;
    node = zmalloc(sizeof(*node));
    node->zl = NULL;
    node->count = 0;
    node->sz = 0;
    node->next = node->prev = NULL;
    node->encoding = QUICKLIST_NODE_ENCODING_RAW;
    node->container = QUICKLIST_NODE_CONTAINER_ZIPLIST;
    node->recompress = 0;
    return node;
}

同樣插入分為插入到頭部或者插入到尾部,先看下整體過程,如果是插入到頭部,那麼首先判斷頭部的ziplist是不是滿了,如果沒滿則允許插入,對ziplist進行修改放到頭部的位置,其實就是對ziplist的空間進行擴容,將所有的entry往後搬遷然後將value填充到前面,詳細的操作就在ziplist.c中,否則如果滿了就更簡單了,直接new一個新的quicklist node,然後用value填充形成ziplist,最後執行_quicklistInsertNodeBefore將當前新建的node插入到當前雙向鏈表的頭部,具體程式碼就是雙向鏈表的操作程式碼,比較簡單,另外在尾部插入也是類似的操作,只是和頭部寫入相比少了數據搬遷的過程,效率相對來說更高一些,所以rpush操作比lpush操作性能略高一些。

同理對於pop的操作非常類似,就不再詳細敘述了,其中有個判斷ziplist是否滿的函數_quicklistNodeAllowInsert可以來簡單說下:

#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)

REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                           const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;

    int ziplist_overhead;
    /* size of previous offset */
    if (sz < 254)
        ziplist_overhead = 1;
    else
        ziplist_overhead = 5;

    /* size of forward offset */
    if (sz < 64)
        ziplist_overhead += 1;
    else if (likely(sz < 16384))
        ziplist_overhead += 2;
    else
        ziplist_overhead += 5;

    /* new_sz overestimates if 'sz' encodes to an integer type */
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1;
    /* when we return 1 above we know that the limit is a size limit (which is
     * safe, see comments next to optimization_level and SIZE_SAFETY_LIMIT) */
    else if (!sizeMeetsSafetyLimit(new_sz))
        return 0;
    else if ((int)node->count < fill)
        return 1;
    else
        return 0;
}

// 表示ziplist級別的常量數組
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};

REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
                                               const int fill) {
    if (fill >= 0)
        return 0;

    size_t offset = (-fill) - 1;
    if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
        if (sz <= optimization_level[offset]) {
            return 1;
        } else {
            return 0;
        }
    } else {
        return 0;
    }
}

_quicklistNodeAllowInsert這個函數是判斷ziplist能否再容納下當前長度的新的內容,就是按照上面entry格式的定義預先算了一下長度,如果fill是真實的長度則會走下面的判斷,如果是-5-1則會調用_quicklistNodeSizeMeetsOptimizationRequirement這個函數判斷新的長度是否滿足條件,首先計算了offset範圍是04,這裡是用數組的大小數以第一個元素的大小,由於數組是5個元素所以大小為5,所以必然進入第一個分支,這個時候將offset作為下標取值,這裡數組值就是這個level下ziplist的最大限制,例如默認fill為-2,所以offset為1,因此大小限制為8192,滿足就返回1表示允許。再回到第一個函數如果fill是大於0的,會走第二個判斷先判斷是不是滿足小於SIZE_SAFETY_LIMIT,如果滿足再判斷是否小於實際配置的fill值這樣,所以默認情況都是在optimization_level這個數組這裡限制的。

另外還會注意到很有趣的一點就是,從quicklistPushHead再到_quicklistNodeAllowInsert調用了2次likely函數,還有1次unlikely函數,其實likely/unlikely都屬於系統調用,作用是用來優化CPU的分支預測,如果一個條件我們認為經常成立,那麼可以用likely告訴CPU按照期望來預測,unlikely是反過來的會告訴CPU不要走這個分支,因為CPU自動分支預測會先對運行結果進行學習然後再預測,如果使用likely/unlikely系統調用後相當於告訴CPU先驗知識,使用恰當可以提升程式的運行效率,但是反過來如果用的不好或者用反了,要比默認情況下性能低,因為默認情況下fill的值為-2所以會百分之百進入第一個分支,所以加上likely是會提升性能的,但是如果配置為1個大於0的值,性能會有所降低,這個配置項對應redis.conf中的list-max-ziplist-size -2,推薦配置為-2或者-1,不要自己指定長度。

上面就是Redis中list基本的源碼分析,經過分析可以知道list是由雙端列表實現,其中鏈表中的每個節點是由壓縮列表實現,優點就是ziplist非常緊湊,並且對CPU快取友好,通常只取頭部或者尾部,解析ziplist性能很高,可以降低鏈表節點過多造成記憶體碎片,缺點就是每次添加刪除元素都需要空間伸縮和數據搬遷,不過這部分記憶體是由Redis的zmalloc庫來管理,因此可以高效復用,省掉了系統調用的開銷。

list在使用時要注意下面幾個點:

  1. list的push和pop操作複雜度都是O(1),計算長度的llen複雜度也是O(1),這類操作是比較高效的。
  2. 像lrange、lset、lindex操作的複雜度都是O(n),要盡量避免使用,但是由於ziplist是包含一組元素,所以按照下標查找可以一次跳過整個ziplist,相比普通的雙向鏈表比還是比較高效的,lrange這種操作一般用在查看頭部或尾部少量元素時使用。
  3. list-max-ziplist-size正常建議配置為-2或-1,不要自己指定長度。

其他一些類型的源碼分析方法類似,不再詳細分析了,只簡要概括下。

2.2 Hash類型

hash類型它也是由ziplist或散列表實現,不過這兩個不是同時使用而是一種轉化的關係,具體在server.h中有對於其默認值的定義:

#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64

關於hash的編碼類型也只有OBJ_ENCODING_ZIPLIST和OBJ_ENCODING_HT這兩種,具體在t_hash.c中有相關的源碼,初始情況下如果全局哈希表中key不存在,則先進行初始化,value此時會設置為ziplist類型:

// t_hash.c hset/hmset前先查找key
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        // 不存在則先創建hash object
        o = createHashObject();
        dbAdd(c->db,key,o);
    } else {
        if (o->type != OBJ_HASH) {
            addReply(c,shared.wrongtypeerr);
            return NULL;
        }
    }
    return o;
}

// object.c 創建hash object
robj *createHashObject(void) {
    // 創建ziplist對象
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

那麼什麼時候會轉換為散列表實現,在hashTypeSet函數中有這麼1行:

/* Check if the ziplist needs to be converted to a hash table */
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
    hashTypeConvert(o, OBJ_ENCODING_HT);

在向ziplist插入新元素後,如果ziplist的entry個數大於hash_max_ziplist_entries的配置值,則會自動轉換為散列表來實現,默認情況下上面的宏定義為512,當元素超過512個時會轉換為散列表,需要注意的是一旦壓縮列錶轉換為散列表,就算元素被刪除,也不會再轉換回去了,程式碼中僅僅會執行dictResize操作。

hash類型如果使用ziplist實現的情況下寫入和查找的時間複雜度都是O(n),有全局的限制,ziplist一般不會太長,大多數情況下都會轉成散列表類型,這時HSET/HGET的複雜度都是O(1)。

對於hash-max-ziplist-entries的配置建議是如果機器記憶體充足,一般情況下保持默認即可,如果記憶體比較緊張可以時間換空間,可以把這個配置改大,因為壓縮列表本身比較節省空間,通過犧牲讀寫的效率來節約記憶體使用。

2.3 Set和ZSet

對於Set類型底層由數組或散列表構成,如果Set的元素都是整數且元素個數小於512個,會使用數組來保存,默認值也有定義如下:

#define OBJ_SET_MAX_INTSET_ENTRIES 512

當元素個數超過512時會轉化為散列表實現,這個和hash類型非常類似,只是此時散列表的值都為NULL。

最後看ZSet類型,ZSet是由壓縮列表或跳錶實現,和hash類型一樣由兩個參數來控制:

#define OBJ_ZSET_MAX_ZIPLIST_ENTRIES 128
#define OBJ_ZSET_MAX_ZIPLIST_VALUE 64

默認當元素個數小於128個並且value大小小於64時,採用ziplist方式來實現,否則會轉換為sikplist實現,具體的程式碼片段參考t_zset.c:

    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }

如果在全局哈希表中沒有找到key的情況下,會判斷如果zset_max_ziplist_entries設置為0或者寫入value的長度大於zset_max_ziplist_value的情況下會創建ZsetObject,否則會創建ZiplistObject。那麼隨著元素不斷的插入一旦達到閾值則會從ziplist轉換為skiplist:

            /* check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value ||
                !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
            {
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                if (newscore) *newscore = score;
                *flags |= ZADD_ADDED;
                return 1;
            }

寫入之前判斷如果ZSet中元素個數大於zset_max_ziplist_entries或者元素的長度大於zset_max_ziplist_value或者寫入之後ziplist的大小沒有超過8k限制,則會執行zsetConvert將對象轉換為OBJ_ENCODING_SKIPLIST類型,否則就正常執行插入。

至於zset的結構定義在server.h中:

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

可以看到zset除了zskiplist之外還定義了dict,我們知道對於跳錶來說可以實現插入、查找、刪除的複雜度都是O(log N),除了這個之外zset還有一些比較簡單的操作例如直接根據元素獲取對應的分值或者判斷某元素是否在zset中等,對於這些操作可以進一步提升性能,所以作者通過空間換時間的方式增加了一個dict來維護元素值和分值的關係,像ZSCORE的複雜度就是O(1),從而對跳錶進行加速,對於計算zset長度的操作,因為zskiplist中也會維護計數,複雜度也是O(1)。

綜上總結下就是:單元素操作大部分複雜度都是O(1),例如:HGET、HSET、HDEL、SADD、SREM、ZSCORE、LPUSH、LPOP、RPUSH、RPOP、ZSCORE等,計算長度的複雜度也是O(1),例如:LLEN、HLEN、ZCARD等,對於zset的其他操作大部分複雜度都是O(log n),如:ZRANGEBYSCORE、ZADD、ZCOUNT、ZINCRBY、ZRANK、ZLEXCOUNT等,這些操作在大部分情況下都是比較高效的。

另外還有些比較危險的操作,例如:keys *、HGETALL、SMEMBERS、LRANGE、ZRANGE這些操作,複雜度都是O(n),需要對整個數據結構進行遍歷,可能會帶來非常大的開銷,直接阻塞其他請求的執行,使用時務必謹慎,除非你知道自己在做什麼,這些操作推薦使用Redis提供的游標進行操作,對應的有SCAN、HSCAN、SSCAN、ZSCAN等,這些操作都是每次返回一小批數據,然後基於游標再進行迭代,這樣不會一次性查詢造成其他的請求阻塞,CPU時間被輪換使用。

Redis數據結構是性能優化的基石,寫程式碼之前考慮使用合適的數據結構可以避免掉很多性能方面的問題。