Golang 實現 Redis(3): 實現記憶體資料庫

  • 2020 年 3 月 29 日
  • 筆記

本文是 golang 實現 redis 系列的第三篇, 主要介紹如何實現記憶體KV資料庫。本文完整源程式碼在作者Github: HDT3213/godis

db.go 是記憶體資料庫的主要源文件,db.Exec 方法會從協議解析器中獲得命令參數並調用相應的處理函數進行處理。

目錄:

Concurrent Hash Map

KV 記憶體資料庫的核心是並發安全的哈希表,常見的設計有幾種:

  • sync.map: golang 官方提供的並發哈希表, 性能優秀但結構複雜不便於擴展

  • juc.ConcurrentHashMap: java 的並發哈希表採用分段鎖實現。在進行擴容時訪問哈希表執行緒都將協助進行 rehash 操作,在 rehash 結束前所有的讀寫操作都會阻塞。因為快取資料庫中鍵值對數量巨大且對讀寫操作響應時間要求較高,使用juc的策略是不合適的。

  • memcached hashtable: 在後台執行緒進行 rehash 操作時,主執行緒會判斷要訪問的哈希槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 primary_hashtable。
    這種策略使主執行緒和rehash執行緒之間的競爭限制在哈希槽內,最小化rehash操作對讀寫操作的影響,這是最理想的實現方式。但由於作者才疏學淺無法使用 golang 實現該策略故忍痛放棄(主要原因在於 golang 沒有 volatile 關鍵字, 保證執行緒可見性的操作非常複雜),歡迎各位讀者討論。

本文採用在 sync.map 發布前 golang 社區廣泛使用的分段鎖策略。我們將key分散到固定數量的 shard 中避免 rehash 操作。shard 是有鎖保護的 map, 當 shard 進行 rehash 時會阻塞shard內的讀寫,但不會對其他 shard 造成影響。

這種策略簡單可靠易於實現,但由於需要兩次 hash 性能略差。這個 dict 完整源碼在Github 可以獨立使用(雖然也沒有什麼用。。。)。

定義數據結構:

type ConcurrentDict struct {      table []*Shard      count int32  }    type Shard struct {      m     map[string]interface{}      mutex sync.RWMutex  }  

在構造時初始化 shard,這個操作相對比較耗時:

func computeCapacity(param int) (size int) {  	if param <= 16 {  		return 16  	}  	n := param - 1  	n |= n >> 1  	n |= n >> 2  	n |= n >> 4  	n |= n >> 8  	n |= n >> 16  	if n < 0 {  		return math.MaxInt32  	} else {  		return int(n + 1)  	}  }    func MakeConcurrent(shardCount int) *ConcurrentDict {      shardCount = computeCapacity(shardCount)      table := make([]*Shard, shardCount)      for i := 0; i < shardCount; i++ {          table[i] = &Shard{              m: make(map[string]interface{}),          }      }      d := &ConcurrentDict{          count: 0,          table: table,      }      return d  }  

哈希演算法選擇FNV演算法:

const prime32 = uint32(16777619)    func fnv32(key string) uint32 {      hash := uint32(2166136261)      for i := 0; i < len(key); i++ {          hash *= prime32          hash ^= uint32(key[i])      }      return hash  }  

定位shard, 當n為2的整數冪時 h % n == (n – 1) & h

func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {  	if dict == nil {  		panic("dict is nil")  	}  	tableSize := uint32(len(dict.table))  	return (tableSize - 1) & uint32(hashCode)  }    func (dict *ConcurrentDict) getShard(index uint32) *Shard {  	if dict == nil {  		panic("dict is nil")  	}  	return dict.table[index]  }  

Get 和 Put 方法實現:

func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {  	if dict == nil {  		panic("dict is nil")  	}  	hashCode := fnv32(key)  	index := dict.spread(hashCode)  	shard := dict.getShard(index)  	shard.mutex.RLock()  	defer shard.mutex.RUnlock()  	val, exists = shard.m[key]  	return  }    func (dict *ConcurrentDict) Len() int {  	if dict == nil {  		panic("dict is nil")  	}  	return int(atomic.LoadInt32(&dict.count))  }    // return the number of new inserted key-value  func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {  	if dict == nil {  		panic("dict is nil")  	}  	hashCode := fnv32(key)  	index := dict.spread(hashCode)  	shard := dict.getShard(index)  	shard.mutex.Lock()  	defer shard.mutex.Unlock()    	if _, ok := shard.m[key]; ok {  		shard.m[key] = val  		return 0  	} else {  		shard.m[key] = val  		dict.addCount()  		return 1  	}  }  

LockMap

上一節實現的ConcurrentMap 可以保證對單個 key 操作的並發安全性,但是仍然無法滿足需求:

  1. MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設置值, 因此我們需要鎖定所有給定的鍵直到完成所有鍵的檢查和設置
  2. LPOP 命令移除列表中最後一個元素後需要移除該鍵值對,因此我們鎖定該鍵直到移除元素並移除空列表

因此我們需要實現 db.Locker 用於鎖定一個或一組 key 並在我們需要的時候釋放鎖。

實現 db.Locker 最直接的想法是使用一個 map[string]*sync.RWMutex, 加鎖過程分為兩步: 初始化對應的鎖 -> 加鎖, 解鎖過程也分為兩步: 解鎖 -> 釋放對應的鎖。那麼存在一個無法解決的並發問題:

時間 協程A 協程B
1 locker["a"].Unlock()
2 locker["a"] = &sync.RWMutex{}
3 delete(locker["a"])
4 locker["a"].Lock()

由於 t3 時協程B釋放了鎖,t4 時協程A試圖加鎖會失敗。

若我們在解鎖時不釋放鎖就可以避免該異常的發生,但是每個曾經使用過的鎖都無法釋放從而造成嚴重的記憶體泄露。

我們注意到哈希表的長度遠少於可能的鍵的數量,反過來說多個鍵可以共用一個哈希槽。若我們不為單個鍵加鎖而是為它所在的哈希槽加鎖,因為哈希槽的數量非常少即使不釋放鎖也不會佔用太多記憶體。

作者根據這種思想實現了 LockerMap 來解決並發控制問題。

type Locks struct {      table []*sync.RWMutex  }    func Make(tableSize int) *Locks {      table := make([]*sync.RWMutex, tableSize)      for i := 0; i < tableSize; i++ {          table[i] = &sync.RWMutex{}      }      return &Locks{          table: table,      }  }    func (locks *Locks)Lock(key string) {      index := locks.spread(fnv32(key))      mu := locks.table[index]      mu.Lock()  }    func (locks *Locks)UnLock(key string) {      index := locks.spread(fnv32(key))      mu := locks.table[index]      mu.Unlock()  }  

哈希演算法已經在Dict一節介紹過不再贅述。

在鎖定多個key時需要注意,若協程A持有鍵a的鎖試圖獲得鍵b的鎖,此時協程B持有鍵b的鎖試圖獲得鍵a的鎖則會形成死鎖。

解決方法是所有協程都按照相同順序加鎖,若兩個協程都想獲得鍵a和鍵b的鎖,那麼必須先獲取鍵a的鎖後獲取鍵b的鎖,這樣就可以避免循環等待。

func (locks *Locks)Locks(keys ...string) {      keySlice := make(sort.StringSlice, len(keys))      copy(keySlice, keys)      sort.Sort(keySlice)      for _, key := range keySlice {          locks.Lock(key)      }  }    func (locks *Locks)RLocks(keys ...string) {      keySlice := make(sort.StringSlice, len(keys))      copy(keySlice, keys)      sort.Sort(keySlice)      for _, key := range keySlice {          locks.RLock(key)      }  }  

TTL

Time To Live (TTL) 的實現方式非常簡單,其核心是 string -> time 哈希表。

當訪問某個 key 時會檢查是否過期,並刪除過期key:

func (db *DB) Get(key string) (*DataEntity, bool) {  	db.stopWorld.RLock()  	defer db.stopWorld.RUnlock()    	raw, ok := db.Data.Get(key)  	if !ok {  		return nil, false  	}  	if db.IsExpired(key) {  		return nil, false  	}  	entity, _ := raw.(*DataEntity)  	return entity, true  }    func (db *DB) IsExpired(key string) bool {  	rawExpireTime, ok := db.TTLMap.Get(key)  	if !ok {  		return false  	}  	expireTime, _ := rawExpireTime.(time.Time)  	expired := time.Now().After(expireTime)  	if expired {  		db.Remove(key)  	}  	return expired  }  

同時會定時的檢查過期key並刪除:

func (db *DB) CleanExpired() {  	now := time.Now()  	toRemove := &List.LinkedList{}  	db.TTLMap.ForEach(func(key string, val interface{}) bool {  		expireTime, _ := val.(time.Time)  		if now.After(expireTime) {  			// expired  			db.Data.Remove(key)  			toRemove.Add(key)  		}  		return true  	})  	toRemove.ForEach(func(i int, val interface{}) bool {  		key, _ := val.(string)  		db.TTLMap.Remove(key)  		return true  	})  }    func (db *DB) TimerTask() {  	ticker := time.NewTicker(db.interval)  	go func() {  		for range ticker.C {  			db.CleanExpired()  		}  	}()  }