走近源碼:神奇的HyperLogLog

  • 2020 年 3 月 11 日
  • 筆記

HyperLogLog是Redis的高級數據結構,是統計基數的利器。前文我們已經介紹過HyperLogLog的基本用法,如果只求會用,只需要掌握HyperLogLog的三個命令即可,如果想要更進一步了解HyperLogLog的原理以及源碼實現,相信這篇文章會給你帶來一些啟發。

基數

在數學上,基數或勢,即集合中包含的元素的「個數」(參見勢的比較),是日常交流中基數的概念在數學上的精確化(並使之不再受限於有限情形)。有限集合的基數,其意義與日常用語中的「基數」相同,例如{displaystyle {a,b,c}}的基數是3。無限集合的基數,其意義在於比較兩個集的大小,例如整數集和有理數集的基數相同;整數集的基數比實數集的小。

在介紹HyperLogLog的原理之前,請你先來思考一下,如果讓你來統計基數,你會用什麼方法。

Set

熟悉Redis數據結構的同學一定首先會想到Set這個結構,我們只需要把數據都存入Set,然後用scard命令就可以得到結果,這是一種思路,但是存在一定的問題。如果數據量非常大,那麼將會耗費很大的記憶體空間,如果這些數據僅僅是用來統計基數,那麼無疑是造成了巨大的浪費,因此,我們需要找到一種佔用記憶體較小的方法。

bitmap

bitmap同樣是一種可以統計基數的方法,可以理解為用bit數組存儲元素,例如01101001,表示的是[1,2,4,8],bitmap中1的個數就是基數。bitmap也可以輕鬆合併多個集合,只需要將多個數組進行異或操作就可以了。bitmap相比於Set也大大節省了記憶體,我們來粗略計算一下,統計1億個數據的基數,需要的記憶體是:100000000/8/1024/1024 ≈ 12M。

雖然bitmap在節省空間方面已經有了不錯的表現,但是如果需要統計1000個對象,就需要大約12G的記憶體,顯然這個結果仍然不能令我們滿意。在這種情況下,HyperLogLog將會出來拯救我們。

HyperLogLog原理

HyperLogLog實際上不會存儲每個元素的值,它使用的是概率演算法,通過存儲元素的hash值的第一個1的位置,來計算元素數量。這麼說不太容易理解,容我先搬出來一個栗子。

有一天Jack和丫丫玩拋硬幣的遊戲,規則是丫丫負責拋硬幣,每次拋到正面為一回合,丫丫可以自己決定進行幾個回合。最後需要告訴Jack最長的那個回合拋了多少次,再由Jack來猜丫丫一共進行了幾個回合。Jack心想:這可不好猜啊,我得算算概率了。於是在腦海中繪製這樣一張圖。

yb

k是每回合拋到1所用的次數,我們已知的是最大的k值,可以用kmax表示,由於每次拋硬幣的結果只有0和1兩種情況,因此,kmax在任意回合出現的概率即為(1/2)kmax,因此可以推測n=2kmax。概率學把這種問題叫做伯努利實驗。此時丫丫已經完成了n個回合,並且告訴Jack最長的一次拋了3次,Jack此時也胸有成竹,馬上說出他的答案8,最後的結果是:丫丫只拋了一回合,Jack輸了,要負責刷碗一個月。

終於,我們的Philippe Flajolet教授遇到了Jack一樣的問題,他決心吸取Jack的教訓,要讓這個演算法更加準確,於是引入了桶的概念,計算m個桶的加權平均值,這樣就能得到比較準確的答案了(實際上還要進行其他修正)。最終的公式如圖

HyperLogLog公式

其中m是桶的數量,const是修正常數,它的取值會根據m而變化。p=log2m

switch (p) {     case 4:         constant = 0.673 * m * m;     case 5:         constant = 0.697 * m * m;     case 6:         constant = 0.709 * m * m;     default:         constant = (0.7213 / (1 + 1.079 / m)) * m * m;  }  

我們回到Redis,對於一個輸入的字元串,首先得到64位的hash值,用前14位來定位桶的位置(共有214,即16384個桶)。後面50位即為伯努利過程,每個桶有6bit,記錄第一次出現1的位置count,如果count>oldcount,就用count替換oldcount。

了解原理之後,我們再來聊一下HyperLogLog的存儲。HyperLogLog的存儲結構分為密集存儲結構和稀疏存儲結構兩種,默認為稀疏存儲結構,而我們常說的佔用12K記憶體的則是密集存儲結構。

密集存儲結構

密集存儲比較簡單,就是連續16384個6bit的串成的點陣圖。由於每個桶是6bit,因此對桶的定位要麻煩一些。

#define HLL_BITS 6 /* Enough to count up to 63 leading zeroes. */  #define HLL_REGISTER_MAX ((1<<HLL_BITS)-1)  /* Store the value of the register at position 'regnum' into variable 'target'.   * 'p' is an array of unsigned bytes. */  #define HLL_DENSE_GET_REGISTER(target,p,regnum) do {       uint8_t *_p = (uint8_t*) p;       unsigned long _byte = regnum*HLL_BITS/8;       unsigned long _fb = regnum*HLL_BITS&7;       unsigned long _fb8 = 8 - _fb;       unsigned long b0 = _p[_byte];       unsigned long b1 = _p[_byte+1];       target = ((b0 >> _fb) | (b1 << _fb8)) & HLL_REGISTER_MAX;   } while(0)    /* Set the value of the register at position 'regnum' to 'val'.   * 'p' is an array of unsigned bytes. */  #define HLL_DENSE_SET_REGISTER(p,regnum,val) do {       uint8_t *_p = (uint8_t*) p;       unsigned long _byte = regnum*HLL_BITS/8;       unsigned long _fb = regnum*HLL_BITS&7;       unsigned long _fb8 = 8 - _fb;       unsigned long _v = val;       _p[_byte] &= ~(HLL_REGISTER_MAX << _fb);       _p[_byte] |= _v << _fb;       _p[_byte+1] &= ~(HLL_REGISTER_MAX >> _fb8);       _p[_byte+1] |= _v >> _fb8;   } while(0)  

如果我們要定位的桶編號為regnum,它的偏移位元組量為(regnum * 6) / 8,起始bit偏移為(regnum * 6) % 8,例如,我們要定位編號為5的桶,位元組偏移是3,位偏移也是6,也就是說,從第4個位元組的第7位開始是編號為3的桶。這裡需要注意,位元組序和我們平時的位元組序相反,因此需要進行倒置。我們用一張圖來說明Redis是如何定位桶並且得到存儲的值(即HLL_DENSE_GET_REGISTER函數的解釋)。

桶定位

對於編號為5的桶,我們已經得到了位元組偏移_byte和為偏移_fb,b0 >> _fb和b1 << _fb8操作是將位元組倒置,然後進行拼接,並且保留最後6位。

稀疏存儲結構

你以為Redis真的會用16384個6bit存儲每一個HLL對象嗎,那就too naive了,雖然它只佔用了12K記憶體,但是Redis對於記憶體的節約已經到了喪心病狂的地步了。因此,如果比較多的計數值都是0,那麼就會採用稀疏存儲的結構。

對於連續多個計數值為0的桶,Redis使用的存儲方式是:00xxxxxx,前綴兩個0,後面6位的值加1表示有連續多少個桶的計數值為0,由於6bit最大能表示64個桶,所以Redis又設計了另一種表示方法:01xxxxxx yyyyyyyy,這樣後面14bit就可以表示16384個桶了,而一個初始狀態的HyperLogLog對象只需要用2個位元組來存儲。

如果連續的桶數都不是0,那麼Redis的表示方式為1vvvvvxx,即為連續(xx+1)個桶的計數值都是(vvvvv+1)。例如,10011110表示連續3個8。這裡使用5bit,最大只能表示32。因此,當某個計數值大於32時,Redis會將這個HyperLogLog對象調整為密集存儲。

Redis用三條指令來表達稀疏存儲的方式:

  1. ZERO:len 單個位元組表示 00[len-1],連續最多64個零計數值
  2. VAL:value,len 單個位元組表示 1[value-1][len-1],連續 len 個值為 value 的計數值
  3. XZERO:len 雙位元組表示 01[len-1],連續最多16384個零計數值

Redis從稀疏存儲轉換到密集存儲的條件是:

  1. 任意一個計數值從 32 變成 33,因為VAL指令已經無法容納,它能表示的計數值最大為 32
  2. 稀疏存儲佔用的總位元組數超過 3000 位元組,這個閾值可以通過 hll_sparse_max_bytes 參數進行調整。

源碼解析

接下來通過源碼來看一下pfadd和pfcount兩個命令的具體流程。在這之前我們首先要了解的是HyperLogLog的頭結構體和創建一個HyperLogLog對象的步驟。

HyperLogLog頭結構體
struct hllhdr {      char magic[4];      /* "HYLL" */      uint8_t encoding;   /* HLL_DENSE or HLL_SPARSE. */      uint8_t notused[3]; /* Reserved for future use, must be zero. */      uint8_t card[8];    /* Cached cardinality, little endian. */      uint8_t registers[]; /* Data bytes. */  };  
創建HyperLogLog對象
#define HLL_P 14 /* The greater is P, the smaller the error. */  #define HLL_REGISTERS (1<<HLL_P) /* With P=14, 16384 registers. */  #define HLL_SPARSE_XZERO_MAX_LEN 16384      #define HLL_SPARSE_XZERO_SET(p,len) do {       int _l = (len)-1;       *(p) = (_l>>8) | HLL_SPARSE_XZERO_BIT;       *((p)+1) = (_l&0xff);   } while(0)    /* Create an HLL object. We always create the HLL using sparse encoding.   * This will be upgraded to the dense representation as needed. */  robj *createHLLObject(void) {      robj *o;      struct hllhdr *hdr;      sds s;      uint8_t *p;      int sparselen = HLL_HDR_SIZE +                      (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /                       HLL_SPARSE_XZERO_MAX_LEN)*2);      int aux;        /* Populate the sparse representation with as many XZERO opcodes as       * needed to represent all the registers. */      aux = HLL_REGISTERS;      s = sdsnewlen(NULL,sparselen);      p = (uint8_t*)s + HLL_HDR_SIZE;      while(aux) {          int xzero = HLL_SPARSE_XZERO_MAX_LEN;          if (xzero > aux) xzero = aux;          HLL_SPARSE_XZERO_SET(p,xzero);          p += 2;          aux -= xzero;      }      serverAssert((p-(uint8_t*)s) == sparselen);        /* Create the actual object. */      o = createObject(OBJ_STRING,s);      hdr = o->ptr;      memcpy(hdr->magic,"HYLL",4);      hdr->encoding = HLL_SPARSE;      return o;  }  

這裡sparselen=HLL_HDR_SIZE+2,因為初始化時默認所有桶的計數值都是0。其他過程不難理解,用的存儲方式是我們前面提到過的稀疏存儲,創建的對象實質上是一個字元串對象,這也是字元串命令可以操作HyperLogLog對象的原因。

PFADD命令
/* PFADD var ele ele ele ... ele => :0 or :1 */  void pfaddCommand(client *c) {      robj *o = lookupKeyWrite(c->db,c->argv[1]);      struct hllhdr *hdr;      int updated = 0, j;        if (o == NULL) {          /* Create the key with a string value of the exact length to           * hold our HLL data structure. sdsnewlen() when NULL is passed           * is guaranteed to return bytes initialized to zero. */          o = createHLLObject();          dbAdd(c->db,c->argv[1],o);          updated++;      } else {          if (isHLLObjectOrReply(c,o) != C_OK) return;          o = dbUnshareStringValue(c->db,c->argv[1],o);      }      /* Perform the low level ADD operation for every element. */      for (j = 2; j < c->argc; j++) {          int retval = hllAdd(o, (unsigned char*)c->argv[j]->ptr,                                 sdslen(c->argv[j]->ptr));          switch(retval) {          case 1:              updated++;              break;          case -1:              addReplySds(c,sdsnew(invalid_hll_err));              return;          }      }      hdr = o->ptr;      if (updated) {          signalModifiedKey(c->db,c->argv[1]);          notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);          server.dirty++;          HLL_INVALIDATE_CACHE(hdr);      }      addReply(c, updated ? shared.cone : shared.czero);  }  

PFADD命令會先判斷key是否存在,如果不存在,則創建一個新的HyperLogLog對象;如果存在,會調用isHLLObjectOrReply()函數檢查這個對象是不是HyperLogLog對象,檢查方法主要是檢查魔數是否正確,存儲結構是否正確以及頭結構體的長度是否正確等。

一切就緒後,才可以調用hllAdd()函數添加元素。hllAdd函數很簡單,只是根據存儲結構判斷需要調用hllDenseAdd()函數還是hllSparseAdd()函數。

密集存儲結構只是比較新舊計數值,如果新計數值大於就計數值,就將其替代。

而稀疏存儲結構要複雜一些:

  1. 判斷是否需要調整為密集存儲結構,如果不需要則繼續進行,否則就先調整為密集存儲結構,然後執行添加操作
  2. 我們需要先定位要修改的位元組段,通過循環計算每一段表示的桶的範圍是否包括要修改的桶
  3. 定位到桶後,如果這個桶已經是VAL,並且計數值大於當前要添加的計數值,則返回0,如果小於當前計數值,就進行更新
  4. 如果是ZERO,並且長度為1,那麼可以直接把它替換為VAL,並且設置計數值
  5. 如果不是上述兩種情況,則需要對現有的存儲進行拆分
PFCOUNT命令
/* PFCOUNT var -> approximated cardinality of set. */  void pfcountCommand(client *c) {      robj *o;      struct hllhdr *hdr;      uint64_t card;        /* Case 1: multi-key keys, cardinality of the union.       *       * When multiple keys are specified, PFCOUNT actually computes       * the cardinality of the merge of the N HLLs specified. */      if (c->argc > 2) {          uint8_t max[HLL_HDR_SIZE+HLL_REGISTERS], *registers;          int j;            /* Compute an HLL with M[i] = MAX(M[i]_j). */          memset(max,0,sizeof(max));          hdr = (struct hllhdr*) max;          hdr->encoding = HLL_RAW; /* Special internal-only encoding. */          registers = max + HLL_HDR_SIZE;          for (j = 1; j < c->argc; j++) {              /* Check type and size. */              robj *o = lookupKeyRead(c->db,c->argv[j]);              if (o == NULL) continue; /* Assume empty HLL for non existing var.*/              if (isHLLObjectOrReply(c,o) != C_OK) return;                /* Merge with this HLL with our 'max' HHL by setting max[i]               * to MAX(max[i],hll[i]). */              if (hllMerge(registers,o) == C_ERR) {                  addReplySds(c,sdsnew(invalid_hll_err));                  return;              }          }            /* Compute cardinality of the resulting set. */          addReplyLongLong(c,hllCount(hdr,NULL));          return;      }        /* Case 2: cardinality of the single HLL.       *       * The user specified a single key. Either return the cached value       * or compute one and update the cache. */      o = lookupKeyWrite(c->db,c->argv[1]);      if (o == NULL) {          /* No key? Cardinality is zero since no element was added, otherwise           * we would have a key as HLLADD creates it as a side effect. */          addReply(c,shared.czero);      } else {          if (isHLLObjectOrReply(c,o) != C_OK) return;          o = dbUnshareStringValue(c->db,c->argv[1],o);            /* Check if the cached cardinality is valid. */          hdr = o->ptr;          if (HLL_VALID_CACHE(hdr)) {              /* Just return the cached value. */              card = (uint64_t)hdr->card[0];              card |= (uint64_t)hdr->card[1] << 8;              card |= (uint64_t)hdr->card[2] << 16;              card |= (uint64_t)hdr->card[3] << 24;              card |= (uint64_t)hdr->card[4] << 32;              card |= (uint64_t)hdr->card[5] << 40;              card |= (uint64_t)hdr->card[6] << 48;              card |= (uint64_t)hdr->card[7] << 56;          } else {              int invalid = 0;              /* Recompute it and update the cached value. */              card = hllCount(hdr,&invalid);              if (invalid) {                  addReplySds(c,sdsnew(invalid_hll_err));                  return;              }              hdr->card[0] = card & 0xff;              hdr->card[1] = (card >> 8) & 0xff;              hdr->card[2] = (card >> 16) & 0xff;              hdr->card[3] = (card >> 24) & 0xff;              hdr->card[4] = (card >> 32) & 0xff;              hdr->card[5] = (card >> 40) & 0xff;              hdr->card[6] = (card >> 48) & 0xff;              hdr->card[7] = (card >> 56) & 0xff;              /* This is not considered a read-only command even if the               * data structure is not modified, since the cached value               * may be modified and given that the HLL is a Redis string               * we need to propagate the change. */              signalModifiedKey(c->db,c->argv[1]);              server.dirty++;          }          addReplyLongLong(c,card);      }  }  

如果要計算多個HyperLogLog的基數,則需要將多個HyperLogLog對象合併,這裡合併方法是將所有的HyperLogLog對象合併到一個名為max的對象中,max採用的是密集存儲結構,如果被合併的對象也是密集存儲結構,則循環比較每一個計數值,將大的那個存入max。如果被合併的是稀疏存儲,則只需要比較VAL即可。

如果計算單個HyperLogLog對象的基數,則先判斷對象頭結構體中的基數快取是否有效,如果有效,可直接返回。如果已經失效,則需要重新計算基數,並修改原有快取,這也是PFCOUNT命令不被當做只讀命令的原因。

結語

最後,給大家推薦一個幫助理解HyperLogLog原理的工具:http://content.research.neustar.biz/blog/hll.html,有興趣的話可以去學習一下。

英文比較好的同學可以直接點擊閱讀原文閱讀antirez的關於HyperLogLogd 部落格。