Redis系列(十一):數據結構Set源碼解析和SADD、SINTER、SDIFF、SUNION、SPOP命令
1.介紹
Hash是以K->V形式存儲,而Set則是K存儲,空間節省了很多
Redis中Set是String類型的無序集合;集合成員是唯一的。
這就意味著集合中不能出現重複的數據。可根據應用場景需要選用該數據類型。(比如:好友/關注/粉絲/感興趣的人/黑白名單)
2.源碼解析
Redis使用Dict和IntSet保存Set數據
// 1. inset 數據結構,在set數據量小且都是整型數據時使用 typedef struct intset { // 編碼範圍,由具體存儲值決定 uint32_t encoding; // 數組長度 uint32_t length; // 具體存儲元素的容器 int8_t contents[]; } intset;
// 2. dict 相關數據結構,即是 hash 的實現相關的數據結構 /* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ typedef struct dictht { dictEntry **table; unsigned long size; unsigned long sizemask; unsigned long used; } dictht; typedef struct dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ } dict; /* If safe is set to 1 this is a safe iterator, that means, you can call * dictAdd, dictFind, and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator, and only dictNext() * should be called while iterating. */ typedef struct dictIterator { dict *d; long index; int table, safe; dictEntry *entry, *nextEntry; /* unsafe iterator fingerprint for misuse detection. */ long long fingerprint; } dictIterator; typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry; typedef struct dictType { unsigned int (*hashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); void *(*valDup)(void *privdata, const void *obj); int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType;
3.SADD
加一個或多個指定的member元素到集合的 key中.指定的一個或者多個元素member 如果已經在集合key中存在則忽略.
如果集合key 不存在,則新建集合key,並添加member元素到集合key中.
如果key 的類型不是集合則返回錯誤.
時間複雜度:O(N)
127.0.0.1:6379> sadd myset "Hello" (integer) 1 127.0.0.1:6379> sadd myset "Hello" (integer) 0 127.0.0.1:6379> smembers myset 1) "Hello" 127.0.0.1:6379>
// 用法: SADD key member1 [member2] // t_set.c, 添加member void saddCommand(client *c) { robj *set; int j, added = 0; // 先從當前db中查找set實例 set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { // 1. 新建set實例並添加到當前db中 set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,c->argv[1],set); } else { if (set->type != OBJ_SET) { addReply(c,shared.wrongtypeerr); return; } } // 對於n個member,一個個地添加即可 for (j = 2; j < c->argc; j++) { // 2. 只有添加成功, added 才會加1 if (setTypeAdd(set,c->argv[j]->ptr)) added++; } // 命令傳播 if (added) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); } server.dirty += added; // 響應添加成功的數量 addReplyLongLong(c,added); } // 1. 創建新的set集合實例(需根據首次的參數類型判定) // t_set.c, 創建set實例 /* Factory method to return a set that *can* hold "value". When the object has * an integer-encodable value, an intset will be returned. Otherwise a regular * hash table. */ robj *setTypeCreate(sds value) { // 如果傳入的value是整型,則創建 intset 類型的set // 否則使用dict類型的set // 一般地,第一個數據為整型,後續數據也應該為整型,所以這個數據結構相對穩定 // 而hash的容器創建時,只使用了一 ziplist 創建,這是不一樣的實現 if (isSdsRepresentableAsLongLong(value,NULL) == C_OK) return createIntsetObject(); return createSetObject(); } // 1.1. 創建 intset 型的set // object.c robj *createIntsetObject(void) { intset *is = intsetNew(); robj *o = createObject(OBJ_SET,is); o->encoding = OBJ_ENCODING_INTSET; return o; } // intset.c, new一個空的intset對象 /* Create an empty intset. */ intset *intsetNew(void) { intset *is = zmalloc(sizeof(intset)); is->encoding = intrev32ifbe(INTSET_ENC_INT16); is->length = 0; return is; } // 1.2. 創建dict 型的set robj *createSetObject(void) { dict *d = dictCreate(&setDictType,NULL); robj *o = createObject(OBJ_SET,d); o->encoding = OBJ_ENCODING_HT; return o; } // dict.c /* Create a new hash table */ dict *dictCreate(dictType *type, void *privDataPtr) { dict *d = zmalloc(sizeof(*d)); _dictInit(d,type,privDataPtr); return d; } /* Initialize the hash table */ int _dictInit(dict *d, dictType *type, void *privDataPtr) { _dictReset(&d->ht[0]); _dictReset(&d->ht[1]); d->type = type; d->privdata = privDataPtr; d->rehashidx = -1; d->iterators = 0; return DICT_OK; } // 2. 添加member到set集合中 // t_set.c, 添加元素 /* Add the specified value into a set. * * If the value was already member of the set, nothing is done and 0 is * returned, otherwise the new element is added and 1 is returned. */ int setTypeAdd(robj *subject, sds value) { long long llval; // 2.1. HT編碼和INTSET編碼分別處理就好 if (subject->encoding == OBJ_ENCODING_HT) { dict *ht = subject->ptr; // 以 value 為 key, 添加實例到ht中 // 實現過程也很簡單,大概就是如果存在則返回NULL(即無需添加),輔助rehash,分配記憶體創建dictEntry實例,稍後簡單看看 dictEntry *de = dictAddRaw(ht,value); if (de) { // 重新設置key為 sdsdup(value), value為NULL dictSetKey(ht,de,sdsdup(value)); dictSetVal(ht,de,NULL); return 1; } } // 2.2. intset 編碼的member添加 else if (subject->encoding == OBJ_ENCODING_INTSET) { // 嘗試解析value為 long 型,值寫入 llval 中 if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { uint8_t success = 0; // 情況1. 可添加到intset中 subject->ptr = intsetAdd(subject->ptr,llval,&success); if (success) { /* Convert to regular set when the intset contains * too many entries. */ // 默認: 512, intset大於之後,則轉換為ht hash表模式存儲 if (intsetLen(subject->ptr) > server.set_max_intset_entries) // 2.3. 轉換intset編碼為 ht 編碼 setTypeConvert(subject,OBJ_ENCODING_HT); return 1; } } else { // 情況2. member 是字元串型,先將set容器轉換為 ht 編碼,再重新執行dict的添加模式 /* Failed to get integer from object, convert to regular set. */ setTypeConvert(subject,OBJ_ENCODING_HT); /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); return 1; } } else { serverPanic("Unknown set encoding"); } return 0; } // 2.1. 添加member到dict中(略解, 在hash數據結構解析中已介紹) // dict.c, 添加某key到 d 字典中 /* Low level add. This function adds the entry but instead of setting * a value returns the dictEntry structure to the user, that will make * sure to fill the value field as he wishes. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value, example: * * entry = dictAddRaw(dict,mykey); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned. * If key was added, the hash entry is returned to be manipulated by the caller. */ dictEntry *dictAddRaw(dict *d, void *key) { int index; dictEntry *entry; dictht *ht; if (dictIsRehashing(d)) _dictRehashStep(d); /* Get the index of the new element, or -1 if * the element already exists. */ // 獲取需要添加的key的存放位置下標(slot), 如果該key已存在, 則返回-1(無可用slot) if ((index = _dictKeyIndex(d, key)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++; /* Set the hash entry fields. */ dictSetKey(d, entry, key); return entry; } // 2.2. 添加整型數據到 intset中 // intset.c, 添加value /* Insert an integer in the intset */ intset *intsetAdd(intset *is, int64_t value, uint8_t *success) { // 獲取value的所屬範圍 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; if (success) *success = 1; /* Upgrade encoding if necessary. If we need to upgrade, we know that * this value should be either appended (if > 0) or prepended (if < 0), * because it lies outside the range of existing values. */ // 默認 is->encoding 為 INTSET_ENC_INT16 (16位長) // 2.2.1. 即超過當前預設的位長,則需要增大預設,然後添加 // 此時的value可以確定: 要麼是最大,要麼是最小 (所以我們可以推斷,此intset應該是有序的) if (valenc > intrev32ifbe(is->encoding)) { /* This always succeeds, so we don't need to curry *success. */ return intsetUpgradeAndAdd(is,value); } else { /* Abort if the value is already present in the set. * This call will populate "pos" with the right position to insert * the value when it cannot be found. */ // 2.2.2. 在當前環境下添加value // 找到value則說明元素已存在,不可再添加 // pos 保存比value小的第1個元素的位置 if (intsetSearch(is,value,&pos)) { if (success) *success = 0; return is; } is = intsetResize(is,intrev32ifbe(is->length)+1); // 在pos不是末尾位置時,需要留出空位,依次移動後面的元素 if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); } // 針對編碼位不變更的情況下設置pos位置的值 _intsetSet(is,pos,value); is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; } // 判斷 value 的位長 // INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64 // 2 < 4 < 8 /* Return the required encoding for the provided value. */ static uint8_t _intsetValueEncoding(int64_t v) { if (v < INT32_MIN || v > INT32_MAX) return INTSET_ENC_INT64; else if (v < INT16_MIN || v > INT16_MAX) return INTSET_ENC_INT32; else return INTSET_ENC_INT16; } // 2.2.1. 升級預設位長,並添加value // intset.c /* Upgrades the intset to a larger encoding and inserts the given integer. */ static intset *intsetUpgradeAndAdd(intset *is, int64_t value) { uint8_t curenc = intrev32ifbe(is->encoding); uint8_t newenc = _intsetValueEncoding(value); int length = intrev32ifbe(is->length); int prepend = value < 0 ? 1 : 0; /* First set new encoding and resize */ is->encoding = intrev32ifbe(newenc); // 每次必進行擴容 is = intsetResize(is,intrev32ifbe(is->length)+1); /* Upgrade back-to-front so we don't overwrite values. * Note that the "prepend" variable is used to make sure we have an empty * space at either the beginning or the end of the intset. */ // 因編碼發生變化,元素的位置已經不能一一對應,需要按照原來的編碼依次轉移過來 // 從後往前依次賦值,所以,記憶體位置上不存在覆蓋問題(後面記憶體位置一定是空的),直接依次賦值即可(高效複製) while(length--) _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc)); /* Set the value at the beginning or the end. */ // 對新增加的元素,負數添加到第0位,否則添加到最後一個元素後一位 if (prepend) _intsetSet(is,0,value); else _intsetSet(is,intrev32ifbe(is->length),value); is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; } /* Resize the intset */ static intset *intsetResize(intset *is, uint32_t len) { uint32_t size = len*intrev32ifbe(is->encoding); // malloc is = zrealloc(is,sizeof(intset)+size); return is; } // intset.c, 獲取pos位置的值 /* Return the value at pos, given an encoding. */ static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64)); memrev64ifbe(&v64); return v64; } else if (enc == INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32)); memrev32ifbe(&v32); return v32; } else { memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16)); memrev16ifbe(&v16); return v16; } } // intset.c, 設置pos位置的值,和數組賦值的實際意義差不多 // 只是這裡數據類型是不確定的,所以使用指針進行賦值 /* Set the value at pos, using the configured encoding. */ static void _intsetSet(intset *is, int pos, int64_t value) { uint32_t encoding = intrev32ifbe(is->encoding); if (encoding == INTSET_ENC_INT64) { ((int64_t*)is->contents)[pos] = value; memrev64ifbe(((int64_t*)is->contents)+pos); } else if (encoding == INTSET_ENC_INT32) { ((int32_t*)is->contents)[pos] = value; memrev32ifbe(((int32_t*)is->contents)+pos); } else { ((int16_t*)is->contents)[pos] = value; memrev16ifbe(((int16_t*)is->contents)+pos); } } // 2.2.2. 在編碼類型未變更的情況,需要查找可以存放value的位置(為了確認該value是否已存在,以及小於value的第一個位置賦值) /* Search for the position of "value". Return 1 when the value was found and * sets "pos" to the position of the value within the intset. Return 0 when * the value is not present in the intset and sets "pos" to the position * where "value" can be inserted. */ static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; /* The value can never be found when the set is empty */ if (intrev32ifbe(is->length) == 0) { if (pos) *pos = 0; return 0; } else { /* Check for the case where we know we cannot find the value, * but do know the insert position. */ // 因 intset 是有序數組,即可以判定是否超出範圍,如果超出則元素必定不存在 if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) { if (pos) *pos = intrev32ifbe(is->length); return 0; } else if (value < _intsetGet(is,0)) { if (pos) *pos = 0; return 0; } } // 使用二分查找 while(max >= min) { mid = ((unsigned int)min + (unsigned int)max) >> 1; cur = _intsetGet(is,mid); if (value > cur) { min = mid+1; } else if (value < cur) { max = mid-1; } else { // 找到了 break; } } if (value == cur) { if (pos) *pos = mid; return 1; } else { // 在沒有找到的情況下,min就是第一個比 value 小的元素 if (pos) *pos = min; return 0; } } // intset移動(記憶體移動) static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) { void *src, *dst; uint32_t bytes = intrev32ifbe(is->length)-from; uint32_t encoding = intrev32ifbe(is->encoding); if (encoding == INTSET_ENC_INT64) { src = (int64_t*)is->contents+from; dst = (int64_t*)is->contents+to; bytes *= sizeof(int64_t); } else if (encoding == INTSET_ENC_INT32) { src = (int32_t*)is->contents+from; dst = (int32_t*)is->contents+to; bytes *= sizeof(int32_t); } else { src = (int16_t*)is->contents+from; dst = (int16_t*)is->contents+to; bytes *= sizeof(int16_t); } memmove(dst,src,bytes); } // 2.3. 轉換intset編碼為 ht 編碼 (如果遇到string型的value或者intset數量大於閥值(默認:512)時) // t_set.c, 類型轉換 /* Convert the set to specified encoding. The resulting dict (when converting * to a hash table) is presized to hold the number of elements in the original * set. */ void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; // 要求外部必須保證 set類型且 intset 編碼 serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET && setobj->encoding == OBJ_ENCODING_INTSET); if (enc == OBJ_ENCODING_HT) { int64_t intele; // 直接創建一個 dict 來容納數據 dict *d = dictCreate(&setDictType,NULL); sds element; /* Presize the dict to avoid rehashing */ // 直接一次性擴容成需要的大小 dictExpand(d,intsetLen(setobj->ptr)); /* To add the elements we extract integers and create redis objects */ // setTypeIterator 迭代器是轉換的關鍵 si = setTypeInitIterator(setobj); while (setTypeNext(si,&element,&intele) != -1) { // element:ht編碼時的key, intele: intset編碼時的value element = sdsfromlonglong(intele); // 因set特性保證是無重複元素,所以添加dict時,必然應成功 // 此處應無 rehash, 而是直接計算 hashCode, 放置元素, 時間複雜度 O(1) serverAssert(dictAdd(d,element,NULL) == DICT_OK); } // 釋放迭代器 setTypeReleaseIterator(si); setobj->encoding = OBJ_ENCODING_HT; zfree(setobj->ptr); setobj->ptr = d; } else { serverPanic("Unsupported set conversion"); } } // t_set.c, 獲取set集合的迭代器 setTypeIterator *setTypeInitIterator(robj *subject) { setTypeIterator *si = zmalloc(sizeof(setTypeIterator)); // 設置迭代器公用資訊 si->subject = subject; si->encoding = subject->encoding; // hash表則需要再迭代 dict if (si->encoding == OBJ_ENCODING_HT) { si->di = dictGetIterator(subject->ptr); } // intset 比較簡單,直接設置下標即可 else if (si->encoding == OBJ_ENCODING_INTSET) { si->ii = 0; } else { serverPanic("Unknown set encoding"); } return si; } // dict.c, dict迭代器初始化 dictIterator *dictGetIterator(dict *d) { dictIterator *iter = zmalloc(sizeof(*iter)); iter->d = d; iter->table = 0; iter->index = -1; iter->safe = 0; iter->entry = NULL; iter->nextEntry = NULL; return iter; } // t_set.c, /* Move to the next entry in the set. Returns the object at the current * position. * * Since set elements can be internally be stored as SDS strings or * simple arrays of integers, setTypeNext returns the encoding of the * set object you are iterating, and will populate the appropriate pointer * (sdsele) or (llele) accordingly. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. * * When there are no longer elements -1 is returned. */ int setTypeNext(setTypeIterator *si, sds *sdsele, int64_t *llele) { // hash表返回key if (si->encoding == OBJ_ENCODING_HT) { dictEntry *de = dictNext(si->di); if (de == NULL) return -1; *sdsele = dictGetKey(de); *llele = -123456789; /* Not needed. Defensive. */ } // intset 直接獲取下標對應的元素即可 else if (si->encoding == OBJ_ENCODING_INTSET) { if (!intsetGet(si->subject->ptr,si->ii++,llele)) return -1; *sdsele = NULL; /* Not needed. Defensive. */ } else { serverPanic("Wrong set encoding in setTypeNext"); } return si->encoding; } // case1: intset直接疊加下標即可 // intset.c /* Sets the value to the value at the given position. When this position is * out of range the function returns 0, when in range it returns 1. */ uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value) { if (pos < intrev32ifbe(is->length)) { *value = _intsetGet(is,pos); return 1; } return 0; } /* Return the value at pos, using the configured encoding. */ static int64_t _intsetGet(intset *is, int pos) { return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding)); } /* Return the value at pos, given an encoding. */ static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64)); memrev64ifbe(&v64); return v64; } else if (enc == INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32)); memrev32ifbe(&v32); return v32; } else { memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16)); memrev16ifbe(&v16); return v16; } } // (附帶)case2: dict的迭代 // dict.c, dict的迭代,存疑問 dictEntry *dictNext(dictIterator *iter) { // 一直迭代查找 while (1) { // iter->entry 為NULL, 有兩種可能: 1. 初始化時; 2. 上一元素為迭代完成(hash衝突) if (iter->entry == NULL) { dictht *ht = &iter->d->ht[iter->table]; if (iter->index == -1 && iter->table == 0) { if (iter->safe) iter->d->iterators++; else iter->fingerprint = dictFingerprint(iter->d); } // 直接使用下標進行迭代,如果中間有空閑位置該如何處理?? // 看起來redis是使用了全量迭代元素的處理辦法,即有可能有許多空迭代過程 // 一般地,也是進行兩層迭代,jdk的hashmap迭代實現為直接找到下一次非空的元素為止 iter->index++; // 直到迭代完成所有元素,否則會直到找到一個元素為止 if (iter->index >= (long) ht->size) { if (dictIsRehashing(iter->d) && iter->table == 0) { iter->table++; iter->index = 0; ht = &iter->d->ht[1]; } else { break; } } iter->entry = ht->table[iter->index]; } else { // entry不為空,就一定有nextEntry?? iter->entry = iter->nextEntry; } // 如果當前entry為空,則繼續迭代下一個 index if (iter->entry) { /* We need to save the 'next' here, the iterator user * may delete the entry we are returning. */ iter->nextEntry = iter->entry->next; return iter->entry; } } return NULL; }
4.SISMEMBER
返回成員 member 是否是存儲的集合 key的成員.
如果member元素是集合key的成員,則返回1
如果member元素不是key的成員,或者集合key不存在,則返回0
時間複雜度:O(1)
127.0.0.1:6379> sismember myset "Hello" (integer) 1 127.0.0.1:6379> sismember myset "World" (integer) 0 127.0.0.1:6379>
// 用法: SISMEMBER key member // t_set.c, void sismemberCommand(client *c) { robj *set; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,OBJ_SET)) return; // 主要方法 setTypeIsMember if (setTypeIsMember(set,c->argv[2]->ptr)) // 回復1 addReply(c,shared.cone); else // 回復0 addReply(c,shared.czero); }
// t_set.c int setTypeIsMember(robj *subject, sds value) { long long llval; if (subject->encoding == OBJ_ENCODING_HT) { // hash 表的查找方式,hashCode 計算,鏈表查找,就這麼簡單 return dictFind((dict*)subject->ptr,value) != NULL; } else if (subject->encoding == OBJ_ENCODING_INTSET) { // 如果當前的set集合是 intset 編碼的,則只有查找值也是整型的情況下才可能查找到元素 if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { // intset 查找,而且 intset 是有序的,所以直接使用二分查找即可 return intsetFind((intset*)subject->ptr,llval); } } else { serverPanic("Unknown set encoding"); } return 0; }
/* Determine whether a value belongs to this set */ uint8_t intsetFind(intset *is, int64_t value) { uint8_t valenc = _intsetValueEncoding(value); // 最大範圍檢查,加二分查找 // intsetSearch 前面已介紹 return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL); }
5.SINTER、SDIFF、SUNION
sinter:
返回指定所有的集合的成員的交集,例如(共同好友)
sdiff:
返回一個集合與給定集合的差集的元素
sunion
返回給定的多個集合的並集中的所有成員
127.0.0.1:6379> sadd myset1 1 2 3 4 5 (integer) 5 127.0.0.1:6379> sadd myset2 3 4 5 6 7 (integer) 5 127.0.0.1:6379> sinter myset1 myset2 1) "3" 2) "4" 3) "5" 127.0.0.1:6379> sdiff myset1 myset2 1) "1" 2) "2" 127.0.0.1:6379> sunion myset1 myset2 1) "1" 2) "2" 3) "3" 4) "4" 5) "5" 6) "6" 7) "7" 127.0.0.1:6379>
sinter源碼解析
// 用法: SINTER key1 [key2] // t_set.c, sinter 實現 void sinterCommand(client *c) { // 第三個參數是用來存儲 交集結果的,兩段程式碼已做復用,說明存儲過程還是比較簡單的 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL); } // t_set.c, 求n個key的集合交集 void sinterGenericCommand(client *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; robj *dstset = NULL; sds elesds; int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; int encoding; for (j = 0; j < setnum; j++) { // 依次查找每個key的set實例 robj *setobj = dstkey ? lookupKeyWrite(c->db,setkeys[j]) : lookupKeyRead(c->db,setkeys[j]); // 只要有一個set為空,則交集必定為為,無需再找 if (!setobj) { zfree(sets); if (dstkey) { // 沒有交集,直接將dstKey 刪除,注意此邏輯?? if (dbDelete(c->db,dstkey)) { signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); } else { addReply(c,shared.emptymultibulk); } return; } if (checkType(c,setobj,OBJ_SET)) { zfree(sets); return; } sets[j] = setobj; } /* Sort sets from the smallest to largest, this will improve our * algorithm's performance */ // 快速排序演算法,將 sets 按照元素長度做排序,使最少元素的set排在最前面 qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); /* The first thing we should output is the total number of elements... * since this is a multi-bulk write, but at this stage we don't know * the intersection set size, so we use a trick, append an empty object * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createIntsetObject(); } /* Iterate all the elements of the first (smallest) set, and test * the element against all the other sets, if at least one set does * not include the element it is discarded */ // 看來redis也是直接通過迭代的方式來完成交集功能 // 迭代最少的set集合,依次查找後續的set集合,當遇到一個不存在的set時,上值被排除,否則是交集 si = setTypeInitIterator(sets[0]); while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) { for (j = 1; j < setnum; j++) { if (sets[j] == sets[0]) continue; // 以下是查找過程 // 分 hash表查找 和 intset 編碼查找 if (encoding == OBJ_ENCODING_INTSET) { /* intset with intset is simple... and fast */ // 兩個集合都是 intset 編碼,直接二分查找即可 if (sets[j]->encoding == OBJ_ENCODING_INTSET && !intsetFind((intset*)sets[j]->ptr,intobj)) { break; /* in order to compare an integer with an object we * have to use the generic function, creating an object * for this */ } else if (sets[j]->encoding == OBJ_ENCODING_HT) { // 編碼不一致,但元素可能相同 // setTypeIsMember 復用前面的程式碼,直接查找即可 elesds = sdsfromlonglong(intobj); if (!setTypeIsMember(sets[j],elesds)) { sdsfree(elesds); break; } sdsfree(elesds); } } else if (encoding == OBJ_ENCODING_HT) { if (!setTypeIsMember(sets[j],elesds)) { break; } } } /* Only take action when all sets contain the member */ // 當迭代完所有集合,說明每個set中都存在該值,是交集(注意分析最後一個迭代) if (j == setnum) { // 不存儲交集的情況下,直接響應元素值即可 if (!dstkey) { if (encoding == OBJ_ENCODING_HT) addReplyBulkCBuffer(c,elesds,sdslen(elesds)); else addReplyBulkLongLong(c,intobj); cardinality++; } // 要存儲交集數據,將值存儲到 dstset 中 else { if (encoding == OBJ_ENCODING_INTSET) { elesds = sdsfromlonglong(intobj); setTypeAdd(dstset,elesds); sdsfree(elesds); } else { setTypeAdd(dstset,elesds); } } } } setTypeReleaseIterator(si); if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ // 存儲集合之前會先把原來的數據刪除,如果進行多次交集運算,dstKey 就相當於臨時表咯 int deleted = dbDelete(c->db,dstkey); if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); addReplyLongLong(c,setTypeSize(dstset)); notifyKeyspaceEvent(NOTIFY_SET,"sinterstore", dstkey,c->db->id); } else { decrRefCount(dstset); addReply(c,shared.czero); if (deleted) notifyKeyspaceEvent(NOTIFY_GENERIC,"del", dstkey,c->db->id); } signalModifiedKey(c->db,dstkey); server.dirty++; } else { setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } // compare 方法 int qsortCompareSetsByCardinality(const void *s1, const void *s2) { return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2); } // 快排樣例 sort.lua -- extracted from Programming Pearls, page 110 function qsort(x,l,u,f) if l<u then local m=math.random(u-(l-1))+l-1 -- choose a random pivot in range l..u x[l],x[m]=x[m],x[l] -- swap pivot to first position local t=x[l] -- pivot value m=l local i=l+1 while i<=u do -- invariant: x[l+1..m] < t <= x[m+1..i-1] if f(x[i],t) then m=m+1 x[m],x[i]=x[i],x[m] -- swap x[i] and x[m] end i=i+1 end x[l],x[m]=x[m],x[l] -- swap pivot to a valid place -- x[l+1..m-1] < x[m] <= x[m+1..u] qsort(x,l,m-1,f) qsort(x,m+1,u,f) end end
sdiff和sunion源碼解析
void sunionCommand(client *c) { sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION); } void sunionstoreCommand(client *c) { sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION); } void sdiffCommand(client *c) { sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF); } void sdiffstoreCommand(client *c) { sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF); }
// 用法: SDIFFSTORE destination key1 [key2] // t_set.c void sdiffstoreCommand(client *c) { // 看起來sdiff 與 sunion 共用了一段程式碼,為啥呢? // 想想 sql 中的 full join // c->argv[1] 是 dstKey sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF); } // t_set.c, 差集並集運算 void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstkey, int op) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; robj *dstset = NULL; sds ele; int j, cardinality = 0; int diff_algo = 1; // 同樣的套路,先查找各key的實例 // 不同的是,這裡的key允許不存在,但不允許類型不一致 for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? lookupKeyWrite(c->db,setkeys[j]) : lookupKeyRead(c->db,setkeys[j]); if (!setobj) { sets[j] = NULL; continue; } if (checkType(c,setobj,OBJ_SET)) { zfree(sets); return; } sets[j] = setobj; } /* Select what DIFF algorithm to use. * * Algorithm 1 is O(N*M) where N is the size of the element first set * and M the total number of sets. * * Algorithm 2 is O(N) where N is the total number of elements in all * the sets. * * We compute what is the best bet with the current input here. */ // 針對差集運算,做演算法優化 if (op == SET_OP_DIFF && sets[0]) { long long algo_one_work = 0, algo_two_work = 0; for (j = 0; j < setnum; j++) { if (sets[j] == NULL) continue; algo_one_work += setTypeSize(sets[0]); algo_two_work += setTypeSize(sets[j]); } /* Algorithm 1 has better constant times and performs less operations * if there are elements in common. Give it some advantage. */ algo_one_work /= 2; diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2; if (diff_algo == 1 && setnum > 1) { /* With algorithm 1 it is better to order the sets to subtract * by decreasing size, so that we are more likely to find * duplicated elements ASAP. */ qsort(sets+1,setnum-1,sizeof(robj*), qsortCompareSetsByRevCardinality); } } /* We need a temp set object to store our union. If the dstkey * is not NULL (that is, we are inside an SUNIONSTORE operation) then * this set object will be the resulting object to set into the target key*/ dstset = createIntsetObject(); if (op == SET_OP_UNION) { /* Union is trivial, just add every element of every set to the * temporary set. */ for (j = 0; j < setnum; j++) { if (!sets[j]) continue; /* non existing keys are like empty sets */ // 依次添加即可,對於 sunion 來說,有序是無意義的 si = setTypeInitIterator(sets[j]); while((ele = setTypeNextObject(si)) != NULL) { if (setTypeAdd(dstset,ele)) cardinality++; sdsfree(ele); } setTypeReleaseIterator(si); } } // 使用演算法1, 依次迭代最大元素 else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) { /* DIFF Algorithm 1: * * We perform the diff by iterating all the elements of the first set, * and only adding it to the target set if the element does not exist * into all the other sets. * * This way we perform at max N*M operations, where N is the size of * the first set, and M the number of sets. */ si = setTypeInitIterator(sets[0]); while((ele = setTypeNextObject(si)) != NULL) { for (j = 1; j < setnum; j++) { if (!sets[j]) continue; /* no key is an empty set. */ if (sets[j] == sets[0]) break; /* same set! */ // 只要有一個相同,就不算是差集?? if (setTypeIsMember(sets[j],ele)) break; } // 這裡的差集是所有set的值都不相同或者為空??? 尷尬了 if (j == setnum) { /* There is no other set with this element. Add it. */ setTypeAdd(dstset,ele); cardinality++; } sdsfree(ele); } setTypeReleaseIterator(si); } // 使用演算法2,直接以第一個元素為基礎,後續set做remove,最後剩下的就是差集 else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) { /* DIFF Algorithm 2: * * Add all the elements of the first set to the auxiliary set. * Then remove all the elements of all the next sets from it. * * This is O(N) where N is the sum of all the elements in every * set. */ for (j = 0; j < setnum; j++) { if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); while((ele = setTypeNextObject(si)) != NULL) { if (j == 0) { if (setTypeAdd(dstset,ele)) cardinality++; } else { if (setTypeRemove(dstset,ele)) cardinality--; } sdsfree(ele); } setTypeReleaseIterator(si); /* Exit if result set is empty as any additional removal * of elements will have no effect. */ if (cardinality == 0) break; } } /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); // 響應差集列表 while((ele = setTypeNextObject(si)) != NULL) { addReplyBulkCBuffer(c,ele,sdslen(ele)); sdsfree(ele); } setTypeReleaseIterator(si); decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ int deleted = dbDelete(c->db,dstkey); if (setTypeSize(dstset) > 0) { // 存儲差集列表,響應差集個數 dbAdd(c->db,dstkey,dstset); addReplyLongLong(c,setTypeSize(dstset)); notifyKeyspaceEvent(NOTIFY_SET, op == SET_OP_UNION ? "sunionstore" : "sdiffstore", dstkey,c->db->id); } else { decrRefCount(dstset); addReply(c,shared.czero); if (deleted) notifyKeyspaceEvent(NOTIFY_GENERIC,"del", dstkey,c->db->id); } signalModifiedKey(c->db,dstkey); server.dirty++; } zfree(sets); } /* This is used by SDIFF and in this case we can receive NULL that should * be handled as empty sets. */ int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) { robj *o1 = *(robj**)s1, *o2 = *(robj**)s2; return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0); }
6.SPOP
從存儲在key
的集合中移除並返回一個或多個隨機元素。
此操作與SRANDMEMBER
類似,它從一個集合中返回一個或多個隨機元素,但不刪除元素。
時間複雜度:O(1)
127.0.0.1:6379> spop myset1 "1" 127.0.0.1:6379> spop myset1 "3" 127.0.0.1:6379> spop myset1 "5" 127.0.0.1:6379> smembers myset1 1) "2" 2) "4" 127.0.0.1:6379>
源碼解析
// 用法: SPOP key [count] // t_set.c void spopCommand(client *c) { robj *set, *ele, *aux; sds sdsele; int64_t llele; int encoding; if (c->argc == 3) { // 彈出指定數量的元素,略 spopWithCountCommand(c); return; } else if (c->argc > 3) { addReply(c,shared.syntaxerr); return; } /* Make sure a key with the name inputted exists, and that it's type is * indeed a set */ if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,OBJ_SET)) return; /* Get a random element from the set */ // 1. 隨機獲取一個元素,這是 spop 的定義 encoding = setTypeRandomElement(set,&sdsele,&llele); /* Remove the element from the set */ // 2. 刪除元素 if (encoding == OBJ_ENCODING_INTSET) { ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL); } else { ele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(set,ele->ptr); } notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id); /* Replicate/AOF this command as an SREM operation */ aux = createStringObject("SREM",4); rewriteClientCommandVector(c,3,aux,c->argv[1],ele); decrRefCount(aux); /* Add the element to the reply */ addReplyBulk(c,ele); decrRefCount(ele); /* Delete the set if it's empty */ if (setTypeSize(set) == 0) { dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } /* Set has been modified */ signalModifiedKey(c->db,c->argv[1]); server.dirty++; } // 沒啥好說的,就看下是如何隨機的就好了 // t_set.c, 隨機獲取一個元素,賦值給 sdsele|llele /* Return random element from a non empty set. * The returned element can be a int64_t value if the set is encoded * as an "intset" blob of integers, or an SDS string if the set * is a regular set. * * The caller provides both pointers to be populated with the right * object. The return value of the function is the object->encoding * field of the object and is used by the caller to check if the * int64_t pointer or the redis object pointer was populated. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. */ int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele) { if (setobj->encoding == OBJ_ENCODING_HT) { // 1.1. dict 型的隨機 dictEntry *de = dictGetRandomKey(setobj->ptr); *sdsele = dictGetKey(de); *llele = -123456789; /* Not needed. Defensive. */ } else if (setobj->encoding == OBJ_ENCODING_INTSET) { // 1.2. intset 型的隨機 *llele = intsetRandom(setobj->ptr); *sdsele = NULL; /* Not needed. Defensive. */ } else { serverPanic("Unknown set encoding"); } return setobj->encoding; } // 1.1. dict 型的隨機 /* Return a random entry from the hash table. Useful to * implement randomized algorithms */ dictEntry *dictGetRandomKey(dict *d) { dictEntry *he, *orighe; unsigned int h; int listlen, listele; if (dictSize(d) == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); // 基本原理就是一直接隨機獲取下標,直到有值 if (dictIsRehashing(d)) { do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ // 獲取隨機下標,須保證在 兩個hash表的範圍內 h = d->rehashidx + (random() % (d->ht[0].size + d->ht[1].size - d->rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; } while(he == NULL); } else { do { h = random() & d->ht[0].sizemask; he = d->ht[0].table[h]; } while(he == NULL); } /* Now we found a non empty bucket, but it is a linked * list and we need to get a random element from the list. * The only sane way to do so is counting the elements and * select a random index. */ listlen = 0; orighe = he; // 對於hash衝突情況,再隨機一次 while(he) { he = he->next; listlen++; } listele = random() % listlen; he = orighe; while(listele--) he = he->next; return he; } // 1.2. intset 型的隨機 // intset.c /* Return random member */ int64_t intsetRandom(intset *is) { // 這個隨機就簡單了,直接獲取隨機下標,因為intset可以保證自身元素的完整性 return _intsetGet(is,rand()%intrev32ifbe(is->length)); }