走近源碼:壓縮列表是怎樣煉成的
- 2020 年 3 月 11 日
- 筆記
經過前面對Redis源碼的了解,令人印象深刻的也許就是Redis各種節約記憶體手段。而Redis對於記憶體的節約可以說是費盡心思,今天我就再來介紹一種Redis為了節約記憶體而創造的存儲結構——壓縮列表(ziplist)。
存儲結構
ziplist是zset和hash在元素數量較少時使用的一種存儲結構。它的特點存儲於一塊連續的記憶體,元素與元素之間沒有空隙。我們可以用DEBUG OBJECT命令來查看一個zset的編碼格式:
127.0.0.1:6379> ZADD db 1.0 mysql 2.0 mongo 3.0 redis (integer) 3 127.0.0.1:6379> DEBUG OBJECT db Value at:0x7f5bf1908070 refcount:1 encoding:ziplist serializedlength:39 lru:9589668 lru_seconds_idle:12
那麼ziplist究竟是一種怎樣的結構的,話不多說,直接看圖。
ZIPLIST OVERALL LAYOUT

ziplist結構
接下來我們挨個解釋一下每一部分存儲的內容:
- zlbytes:32位無符號整數,存儲的是包括它自己在內的整個ziplist所佔用的位元組數
- zltail:32位無符號整數,存儲的是最後一個entry的偏移量,用來快速定位最後一個元素
- zllen:16位無符號整數,用於存儲entry的數量,當元素數量大於216-2時,這個值就被設置為216-1。我們想知道元素的數量就需要遍歷整個列表
- entry:表示存儲的元素
- zlend:8位無符號整數,用於標識整個ziplist的結尾。它的值是255。
ZIPLIST ENTRIES
了解了ziplist的大概結構以後,我們剖析更深一層的entry的結構。
對於每個entry都有兩個前綴
- prevlen:表示前一個元素的長度,它與zltail欄位結合使用可以實現快速的從後向前定位元素
- encoding:表示元素的編碼格式,它用來表示元素是整數還是字元串,如果是字元串,也表示字元串的長度
- entry-data:元素的數據,它並不是一定存在,對於某些編碼而言,編碼本身也是數據,因此這一部分可以省略
這裡要解釋一點,prevlen是一個變長的整數,當前一個元素的長度小於254時,它僅需要一個位元組(8位無符號整數)表示,如果元素的長度大於(或等於)254位元組,prevlen就用5個位元組來表示,其中第一個位元組是254,後4個位元組表示前一個元素的長度。
encoding欄位決定了元素的內容。如果entry存儲的是字元串,那麼就通過encoding的前兩位來區分不同長度的字元串,如果entry存儲的內容是整數,那麼前兩位都會被設置為1,再後面兩位用來區分整數的類型。
- |00pppppp|:字元串長度小於63位元組,pppppp是6位無符號整數,用來表示字元串長度
- |01pppppp|qqqqqqqq|:字元串長度小於等於16383位元組,後面14位表示字元串長度
- |10000000|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt|:字元串長度大於等於16384位元組,後4個位元組表示字元串長度
- |11000000|:16位整數,後面跟2個位元組存儲整數
- |11010000|:32位整數,後面跟4個位元組存儲整數
- |11100000|:64位整數,後面跟8個位元組存儲整數
- |11110000|:24位整數,後面跟3個位元組存儲整數
- |11111110|:8位整數,後面跟1個位元組存儲整數
- |1111xxxx|:(xxxx 取值從0000到1101)表示0到12的整數,讀到的xxxx減1為實際表示的整數。這就是前面提到的省略entry-data的情況
- |11111111|:ziplist的結束值,也就是zlend的值
說了這麼多,也許你還是不太清楚ziplist存儲的內容究竟要表示什麼,我們還是來舉一個栗子
[0f 00 00 00] [0c 00 00 00] [02 00] [00 f3] [02 f6] [ff]
這是一個實際的ziplist存儲的內容,我們就一起來解讀一下。
首先是4個位元組的zlbytes,ziplist一共是15個位元組,因此zlbytes的值是0x0f;接下來是4個位元組的zltail,偏移量是12,因此zltail的值是0x0c;後兩個位元組是zllen,也就是一共兩個元素;第一個元素的prevlen為00,0xf3表示元素值是2:1111 0011符合上述第9條,讀到xxxx為3,需要減1,因此實際值是2;第二個元素同理,0xf6表示的值是5,最後0xff表示這個ziplist結束。
這時,我向這個ziplist中又加了一個元素,是一個字元串,請大家自行解讀下面的entry(注意,只是entry)。友情提示:需要查詢ASCII碼錶來解讀
[02] [0b] [48 65 6c 6c 6f 20 57 6f 72 6c 64]
增加元素
了解了ziplist的存儲之後,我們再來看一下ziplist是如何增加元素的。前面提到過,ziplist存儲結構用於元素數量少的zset和hash。那麼我們就以zset為例,一起追蹤源碼,了解ziplist增加元素的過程。
我們從ZADD命令執行的函數zaddCommand()開始。
void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); }
它只是簡單調用了zaddGenericCommand()函數,傳入了客戶端對象c和一個標誌位,表示要執行ZADD命令,因為這個函數同樣也是ZINCRBY要執行的函數(傳入的標誌是ZADD_INCR)。
而在zaddGenericCommand()函數中,首先對參數進行了處理,並且做了一些校驗。
/* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } dbAdd(c->db,key,zobj); } else { if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } }
然後判斷key是否存在,如果存在,驗證數據類型;否則創建一個新的zset對象。這裡可以看到,當
zset_max_ziplist_entries為0或者第一個元素的長度大於zset_max_ziplist_value時,創建zset對象,否則創建ziplist對象。創建好對象之後,就開始遍曆元素,執行zsetAdd函數了:
for (j = 0; j < elements; j++) { double newscore; score = scores[j]; int retflags = flags; ele = c->argv[scoreidx+1+j*2]->ptr; int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; score = newscore; }
這個函數用來增加新元素或者更新元素的score。這個函數中判斷了zset對象的編碼方式,對壓縮列表ziplist和跳躍列表skiplist分開處理,跳躍列表是zset的另一種編碼方式,這個我們以後再介紹,本文我們只關注ziplist。
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { /* NX? Return, same element already exists. */ if (nx) { *flags |= ZADD_NOP; return 1; } /* Prepare the score for the increment if needed. */ if (incr) { score += curscore; if (isnan(score)) { *flags |= ZADD_NAN; return 0; } if (newscore) *newscore = score; } /* Remove and re-insert when score changed. */ if (score != curscore) { zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); *flags |= ZADD_UPDATED; } return 1; } else if (!xx) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; return 1; } else { *flags |= ZADD_NOP; return 1; } }
可以看到,這裡首先調用zzlFind()函數查找對應的元素,如果元素存在,那麼就判斷是否包含參數NX或者是否是INCR操作。如果修改了元素的分數,則先刪除原有的元素,再重新增加;如果元素不存在,就直接執行zzlInsert()函數,再insert之後,會判斷是否需要改為跳躍列表存儲。這裡有兩個條件:
- zset元素數量大於zset_max_ziplist_entries(默認128)
- 添加的元素長度大於zset_max_ziplist_value(默認64)
滿足任意一個條件,zset都會使用跳躍列表來存儲。
我們繼續追蹤zzlInsert()函數。
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; while (eptr != NULL) { sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); s = zzlGetScore(sptr); if (s > score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { zl = zzlInsertAt(zl,eptr,ele,score); break; } } /* Move to next element. */ eptr = ziplistNext(zl,sptr); } /* Push on tail of list when it was not yet inserted. */ if (eptr == NULL) zl = zzlInsertAt(zl,NULL,ele,score); return zl; }
它首先定位了zset的第一個元素,如果該元素不為空,就比較該元素的分數s與要插入的元素分數score,如果s>score,就插入到當前位置,如果分數相同,則比較元素(按字典序)。插入後,將後面的元素依次移到下一位。
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; size_t offset; scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL); zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL); } else { /* Keep offset relative to zl, as it might be re-allocated. */ offset = eptr-zl; zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele)); eptr = zl+offset; /* Insert score after the element. */ serverAssert((sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } return zl; }
在zzlInsertAt()函數中,主要是調用了ziplistPush()或者ziplistInsert()將元素和分數插入列表尾部或中間。插入順序是先插入元素,然後插入分數。
接下來就到了ziplist.c文件中,真正向壓縮列表中插入元素了。關鍵程式碼在__ziplistInsert()函數中。
首先需要計算插入位置前一個元素的長度,存儲到當前entry的prevlen。
if (p[0] != ZIP_END) { ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else { unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } }
這裡區分了是否是在尾部插入元素的情況,如果是在尾部,就可以通過ziplist中的zltail欄位直接定位。接下來就是嘗試對插入的元素進行編碼,判斷是否可以存儲為整數,如果不能,就按照字元串的編碼格式來存儲。
if (zipTryEncoding(s,slen,&value,&encoding)) { /* 'encoding' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding); } else { /* 'encoding' is untouched, however zipStoreEntryEncoding will use the * string length to figure out how to encode it. */ reqlen = slen; }
這一步判斷是節省記憶體的關鍵,它會使用我們前面介紹的盡量小的編碼格式來進行編碼。編碼完成後就要計算當前entry的長度,包括prevlen、encoding和entry-data,並且需要保證後一個entry(如果有的話)的prevlen能夠保存當前entry的長度。這裡調用的是zipPrevLenByteDiff()函數,需要的prevlen的長度和現有的prevlen的長度的差值,也就是說如果返回為整數,表示需要更多空間。
在這之後就要調用zrealloc()來擴展空間了。這裡有可能會在原來的基礎上進行擴展,也有可能重新分配一塊記憶體,然後將原來的ziplist整體遷移。如果ziplist佔用較大記憶體時,整體遷移的代價是很高的。有了足夠的空間之後,就是把當前位置的entry向後移一位了,然後要修改這個entry的prevlen。更新zltail。
if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; }
nextdiff是前面zipPrevLenByteDiff()函數的返回值,它不為0表示需要更多空間(小於0時被置為0)。這時後面的元素需要級聯更新。所有的這些處理完畢之後,我們終於可以把要插入的entry寫入當前位置了,並且將ziplist的長度加1。
級聯更新
如果一個entry的長度小於254位元組,那麼後一個元素的prevlen就用一個位元組來存儲,否則就要用5個位元組存儲。當我們插入一個元素時,如果它的長度大於253位元組,那麼原來的entry就可能從1個位元組變成5個位元組,而如果由於這一變化導致這個entry的長度大於254位元組,那麼後面的元素也要更新。到後面甚至有可能導致重新分配記憶體的問題,所以級聯更新是一件很可怕的事情。
接下來就通過源碼,看一下級聯更新的具體步驟。(查看ziplist.c文件的__ziplistCascadeUpdate函數)
首先,判斷當前entry是否是最後一個,如果是,則跳出級聯更新。
if (p[rawlen] == ZIP_END) break;
接著判斷了下一個entry的prevlen長度是否發生變化,如果沒有變化,也不用繼續進行級聯更新。
if (next.prevrawlen == rawlen) break;
而如果下一個entry的prevlen長度需要擴展,那麼就先調用ziplistResize擴展記憶體,然後要更新zltail。要將後面的entry向後移動,再開始判斷下一個entry是否需要更新。
if (next.prevrawlensize < rawlensize) { /* The "prevlen" field of "next" needs more bytes to hold * the raw length of "cur". */ offset = p-zl; extra = rawlensize-next.prevrawlensize; zl = ziplistResize(zl,curlen+extra); p = zl+offset; /* Current pointer and offset for next element. */ np = p+rawlen; noffset = np-zl; /* Update tail offset when next element is not the tail element. */ if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra); } /* Move the tail to the back. */ memmove(np+rawlensize, np+next.prevrawlensize, curlen-noffset-next.prevrawlensize-1); zipStorePrevEntryLength(np,rawlen); /* Advance the cursor */ p += rawlen; curlen += extra; }
如果後面的entry的prevlen大於需要的長度呢,此時應該收縮prevlen,如果要進行收縮,那麼可能會繼續級聯更新。這太麻煩了,所以這裡選擇了浪費一些空間,用5個位元組的空間來存儲1個位元組可以存儲的內容。如果prevlen的長度等於需要的長度,就直接更新內容。
if (next.prevrawlensize > rawlensize) { /* This would result in shrinking, which we want to avoid. * So, set "rawlen" in the available bytes. */ zipStorePrevEntryLengthLarge(p+rawlen,rawlen); } else { zipStorePrevEntryLength(p+rawlen,rawlen); } /* Stop here, as the raw length of "next" has not changed. */ break;
除了新增操作以外,刪除操作也有可能引起級聯更新。假設我們有3個entry是下面的情況

刪除級聯更新
我們可以知道,entry2的prevlen需要5個位元組,entry3的prevlen只需要1個位元組。而如果我們刪除了entry2,那麼entry3的prevlen就需要擴展到5個位元組,這一操作就有可能引起級聯更新,後面的情況和新增節點時一樣。
總結
最後做一個總結:
- 壓縮列表是zset和hash元素個數較少時的存儲結構
- ziplist由zlbytes、zltail、zllen、entry、zlend這五部分組成
- 每個entry由prevlen、encoding和entry-data三部分組成
- ziplist增加元素時,需要重新計算插入位置的entry的prevlen(prevlen的長度為1位元組或5位元組),這一操作有可能引起級聯更新。