Redis 源碼簡潔剖析 06 – quicklist 和 listpack
quicklist
為什麼要設計 quicklist
ziplist 有兩個問題,參考 Redis 源碼簡潔剖析 05 – ziplist 壓縮列表:
- 不能保存過多的元素,否則訪問性能會下降
- 不能保存過大的元素,否則容易導致記憶體重新分配,甚至引起連鎖更新
特點
quicklist 的設計,其實是結合了鏈表和 ziplist 各自的優勢。簡單來說,一個 quicklist 就是一個鏈表,而鏈表中的每個元素又是一個 ziplist。
- 結構定義:
quicklist.h
- 實現:
quicklist.c
數據結構
quicklist 是一個鏈表,所以每個 quicklistNode 中,都包含了分別指向它前序和後序節點的指針* prev 和* next。同時,每個 quicklistNode 又是一個 ziplist,所以,在 quicklistNode 的結構體中,還有指向 ziplist 的指針* zl。
每個元素節點 quicklistNode 的定義如下:
typedef struct quicklistNode {
// 前一個 quicklistNode
struct quicklistNode *prev;
// 後一個 quicklistNode
struct quicklistNode *next;
// quicklistNode 指向的 ziplist
unsigned char *zl;
// ziplist 的位元組大小
unsigned int sz;
// ziplist 的元素個數
unsigned int count: 16;
// 編碼方式,『原生位元組數組』或「壓縮存儲」
unsigned int encoding: 2;
// 存儲方式,NONE==1 or ZIPLIST==2
unsigned int container: 2;
// 數據是否被壓縮
unsigned int recompress: 1;
// 數據能否被壓縮
unsigned int attempted_compress: 1;
// 預留的 bit 位
unsigned int extra: 10;
} quicklistNode;
quicklist 的結構體定義如下:
typedef struct quicklist {
// quicklist 的鏈表頭
quicklistNode *head;
// quicklist 的鏈表尾
quicklistNode *tail;
// 所有 ziplist 中的總元素個數
unsigned long count;
// quicklistNodes 的個數
unsigned long len;
……
} quicklist;
typedef struct quicklistEntry {
const quicklist *quicklist;
quicklistNode *node;
unsigned char *zi;
unsigned char *value;
long long longval;
unsigned int sz;
int offset;
} quicklistEntry;
quicklistCreate
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
quicklistDelIndex
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
// 在該節點下的 ziplist 中刪除
node->zl = ziplistDelete(node->zl, p);
// count-1
node->count--;
// ziplist 數量為空,直接刪除該節點
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);
} else {
quicklistNodeUpdateSz(node);
}
// 更新所有 ziplist 中的總元素個數
quicklist->count--;
return gone ? 1 : 0;
}
quicklistDelEntry
核心還是調用了 quicklistDelIndex,但是 quicklistDelEntry 要維護 quicklistNode 的節點,包括迭代器。
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
iter->zi = NULL;
// 如果當前節點被刪除,更新 iterator
if (deleted_node) {
if (iter->direction == AL_START_HEAD) {
iter->current = next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
iter->current = prev;
iter->offset = -1;
}
}
}
quicklistInsertBefore, quicklistInsertAfter
插入分為兩種:
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *node,
void *value, const size_t sz);
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *node,
void *value, const size_t sz);
其底層都調用了_quicklistInsert 函數:
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 0);
}
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 1);
}
_quicklistInsert 函數比較長,但是邏輯很簡單,就是判斷應該在哪裡插入新元素:
- 在後面插入
- 當前 entry 的 ziplist 未滿:直接插入
- 當前 entry 的 ziplist 已滿:
- entry->next 的 ziplist 未滿:在其頭部插入
- entry->next 的 ziplist 已滿:拆分當前 entry
- 在前面插入
- 當前 entry 的 ziplist 未滿:直接插入
- 當前 entry 的 ziplist 已滿:
- entry->prev 的 ziplist 未滿:在其尾部插入
- entry->prev 的 ziplist 已滿:拆分當前 entry
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
……
/* Populate accounting flags for easier boolean checks later */
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
full = 1;
}
// 在後面插入 && 當前 entry 的 ziplist 已滿
if (after && (entry->offset == node->count)) {
at_tail = 1;
// 判斷 next 節點是否可插入
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
full_next = 1;
}
}
// 在前面插入 && 在頭部
if (!after && (entry->offset == 0)) {
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
full_prev = 1;
}
}
/* Now determine where and how to insert the new element */
// 在尾部插入 && 當前節點能插入
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (!full && !after) {
// 在前面插入 && 當前節點能插入
// 直接在當前節點插入即可
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
// 在後面插入 && 在 ziplist 尾部插入 && 當前節點不能插入 && next 節點能插入
// 在 next 節點的頭部插入
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
// 在前面插入 && 在 ziplist 頭部插入 && 當前節點不能插入 && prev 節點能插入
// 在 prev 節點的尾部插入
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
// 當前節點不能插入 && 不能在前後節點插入
// 創建新節點
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
// 節點是滿的,需要將當前節點的 ziplist 拆分
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
其餘方法就不一一展開了,大同小異。
listpack
是什麼
緊湊列表,用一塊連續的記憶體空間來緊湊保存數據,同時使用多種編碼方式,表示不同長度的數據(字元串、整數)。
- 結構定義:
listpack.h
- 實現:
listpack.c
數據結構
編碼類型
在 listpack.c 文件中,有大量的 LP_ENCODING__XX_BIT_INT 和 LP_ENCODING__XX_BIT_STR 的宏定義:
#define LP_ENCODING_7BIT_UINT 0
#define LP_ENCODING_7BIT_UINT_MASK 0x80
#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT)
#define LP_ENCODING_6BIT_STR 0x80
#define LP_ENCODING_6BIT_STR_MASK 0xC0
#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)
……
listpack 元素會對不同長度的整數和字元串進行編碼。
整數編碼
以 LP_ENCODING_7BIT_UINT 為例,元素的實際數據是一個 7 bit 的無符號整數。
對於 LP_ENCODING_13BIT_INT,元素實際數據的表示位數是 13 位,最高 3 位是 110,表示當前的編碼類型
。
#define LP_ENCODING_13BIT_INT 0xC0
#define LP_ENCODING_13BIT_INT_MASK 0xE0
#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
整數其餘的編碼方式有:
- LP_ENCODING_16BIT_INT
- LP_ENCODING_24BIT_INT
- LP_ENCODING_32BIT_INT
- LP_ENCODING_64BIT_INT
字元串編碼
3 種類型:
- LP_ENCODING_6BIT_STR
- LP_ENCODING_12BIT_STR
- LP_ENCODING_32BIT_STR
以編碼類型 LP_ENCODING_6BIT_STR 為例,編碼類型占 1 個位元組。這 1 個位元組包括兩個部分:
- 宏定義 2 位,標識編碼類型
- 後 6 位保存字元串長度
如何避免連鎖更新?
每個列表項都只記錄自己的長度,不會像 ziplist 的列表項會記錄前一項的長度。所以在 listpack 中新增或修改元素,只會涉及到列表項自身的操作,不會影響後續列表項的長度變化,進而避免連鎖更新。
lpNew
unsigned char *lpNew(size_t capacity) {
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
// 設置 listpack 的大小
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
// 設置 listpack 的元素個數,初始是 0
lpSetNumElements(lp,0);
// 設置 listpack 的結尾標識符 LP_EOF,值是 255
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
#define lpSetTotalBytes(p,v) do { \
(p)[0] = (v)&0xff; \
(p)[1] = ((v)>>8)&0xff; \
(p)[2] = ((v)>>16)&0xff; \
(p)[3] = ((v)>>24)&0xff; \
} while(0)
#define lpSetNumElements(p,v) do { \
(p)[4] = (v)&0xff; \
(p)[5] = ((v)>>8)&0xff; \
} while(0)
lpFirst
獲取 listpack 的第一個元素。
/* Return a pointer to the first element of the listpack, or NULL if the
* listpack has no elements. */
unsigned char *lpFirst(unsigned char *lp) {
// 跳過 listpack 的頭部 6 位元組
unsigned char *p = lp + LP_HDR_SIZE; /* Skip the header. */
// 若是末尾結束位元組,返回 NULL
if (p[0] == LP_EOF) return NULL;
lpAssertValidEntry(lp, lpBytes(lp), p);
return p;
}
lpNext
/* If 'p' points to an element of the listpack, calling lpNext() will return
* the pointer to the next element (the one on the right), or NULL if 'p'
* already pointed to the last element of the listpack. */
unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
……
// 偏移指針指向下一個列表項
p = lpSkip(p);
if (p[0] == LP_EOF) return NULL;
……
return p;
}
核心是調用了 lpSkip 函數:
unsigned char *lpSkip(unsigned char *p) {
// 計算當前 entry 編碼類型和實際數據的總長度
unsigned long entrylen = lpCurrentEncodedSizeUnsafe(p);
entrylen += lpEncodeBacklen(NULL,entrylen);
// 偏移指針
p += entrylen;
return p;
}
lpSkip 核心是調用了 lpCurrentEncodedSizeUnsafe 函數獲取當前 entry 的總長度:
uint32_t lpCurrentEncodedSizeUnsafe(unsigned char *p) {
// LP_ENCODING_IS_7BIT_UINT,編碼類型和整數數值都在同一個位元組,所以返回 1
if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;
// LP_ENCODING_IS_6BIT_STR,1 位元組表示編碼類型和字元串長度,該位元組後 6 位表示字元串長度
if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);
if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;
if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;
if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;
if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;
if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;
if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);
if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);
if (p[0] == LP_EOF) return 1;
return 0;
}
大同小異,這裡只介紹下 LP_ENCODING_IS_6BIT_STR,1 位元組表示編碼類型和字元串長度,該位元組後 6 位表示字元串長度。使用 LP_ENCODING_6BIT_STR_LEN 宏定義計算後 6 位的數值。
#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F)
lpSkip 函數還調用了 lpEncodeBacklen 函數,計算 entry 最後一部分 len 的長度(即下圖中的 len)。
unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
//編碼類型和實際數據的總長度小於等於 127,entry-len 長度為 1 位元組
if (l <= 127) {
...
return 1;
} else if (l < 16383) { //編碼類型和實際數據的總長度大於 127 但小於 16383,entry-len 長度為 2 位元組
...
return 2;
} else if (l < 2097151) {//編碼類型和實際數據的總長度大於 16383 但小於 2097151,entry-len 長度為 3 位元組
...
return 3;
} else if (l < 268435455) { //編碼類型和實際數據的總長度大於 2097151 但小於 268435455,entry-len 長度為 4 位元組
...
return 4;
} else { //否則,entry-len 長度為 5 位元組
...
return 5;
}
}
lpPrev
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
assert(p);
// 如果是第一項,直接返回 NULL
if (p-lp == LP_HDR_SIZE) return NULL;
p--; /* Seek the first backlen byte of the last element. */
uint64_t prevlen = lpDecodeBacklen(p);
prevlen += lpEncodeBacklen(NULL,prevlen);
p -= prevlen-1; /* Seek the first byte of the previous entry. */
lpAssertValidEntry(lp, lpBytes(lp), p);
return p;
}
lpGet
就是按照編碼類型,然後解析出實際數據所佔位元組數,進而獲取對應數值。
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
int64_t val;
uint64_t uval, negstart, negmax;
assert(p); /* assertion for valgrind (avoid NPD) */
if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
negstart = UINT64_MAX; /* 7 bit ints are always positive. */
negmax = 0;
uval = p[0] & 0x7f;
} else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
*count = LP_ENCODING_6BIT_STR_LEN(p);
return p+1;
} else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
……
} else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
……
} else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
……
} else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
……
} else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
……
} else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
……
} else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
……
} else {
……
}
/* We reach this code path only for integer encodings.
* Convert the unsigned value to the signed one using two's complement
* rule. */
if (uval >= negstart) {
/* This three steps conversion should avoid undefined behaviors
* in the unsigned -> signed conversion. */
uval = negmax-uval;
val = uval;
val = -val-1;
} else {
val = uval;
}
……
}
參考鏈接
Redis 源碼簡潔剖析系列
Java 編程思想-最全思維導圖-GitHub 下載鏈接,需要的小夥伴可以自取~
原創不易,希望大家轉載時請先聯繫我,並標註原文鏈接。