死磕以太坊源碼分析之MPT樹-下

死磕以太坊源碼分析之MPT樹-下

文章以及資料請查看://github.com/blockchainGuide/

上篇主要介紹了以太坊中的MPT樹的原理,這篇主要會對MPT樹涉及的源碼進行拆解分析。trie模組主要有以下幾個文件:

|-encoding.go 主要講編碼之間的轉換
|-hasher.go 實現了從某個結點開始計運算元樹的哈希的功能
|-node.go 定義了一個Trie樹中所有結點的類型和解析的程式碼
|-sync.go 實現了SyncTrie對象的定義和所有方法
|-iterator.go 定義了所有枚舉相關介面和實現
|-secure_trie.go 實現了SecureTrie對象
|-proof.go 為key構造一個merkle證明
|-trie.go Trie樹的增刪改查
|-database.go 對記憶體中的trie樹節點進行引用計數

實現概覽

encoding.go

這個主要是講三種編碼(KEYBYTES encodingHEX encodingCOMPACT encoding)的實現與轉換,trie中全程都需要用到這些,該文件中主要實現了如下功能:

  1. hex編碼轉換為Compact編碼:hexToCompact()
  2. Compact編碼轉換為hex編碼:compactToHex()
  3. keybytes編碼轉換為Hex編碼:keybytesToHex()
  4. hex編碼轉換為keybytes編碼:hexToKeybytes()
  5. 獲取兩個位元組數組的公共前綴的長度:prefixLen()
func hexToCompact(hex []byte) []byte {
    terminator := byte(0)
    if hasTerm(hex) { //檢查是否有結尾為0x10 => 16
        terminator = 1 //有結束標記16說明是葉子節點
        hex = hex[:len(hex)-1] //去除尾部標記
    }
    buf := make([]byte, len(hex)/2+1) // 位元組數組
    
    buf[0] = terminator << 5 // 標誌byte為00000000或者00100000
    //如果長度為奇數,添加奇數位標誌1,並把第一個nibble位元組放入buf[0]的低四位
    if len(hex)&1 == 1 {
        buf[0] |= 1 << 4 // 奇數標誌 00110000
        buf[0] |= hex[0] // 第一個nibble包含在第一個位元組中 0011xxxx
        hex = hex[1:]
    }
    //將兩個nibble位元組合併成一個位元組
    decodeNibbles(hex, buf[1:])
    return buf
  
//compact編碼轉化為Hex編碼
func compactToHex(compact []byte) []byte {
    base := keybytesToHex(compact)
    base = base[:len(base)-1]
     // apply terminator flag
    // base[0]包括四種情況
    // 00000000 擴展節點偶數位
    // 00000001 擴展節點奇數位
    // 00000010 葉子節點偶數位
    // 00000011 葉子節點奇數位

    // apply terminator flag
    if base[0] >= 2 {
       //如果是葉子節點,末尾添加Hex標誌位16
        base = append(base, 16)
    }
    // apply odd flag
    //如果是偶數位,chop等於2,否則等於1
    chop := 2 - base[0]&1
    return base[chop:]
}
//compact編碼轉化為Hex編碼
func compactToHex(compact []byte) []byte {
    base := keybytesToHex(compact)
    base = base[:len(base)-1]
     // apply terminator flag
    // base[0]包括四種情況
    // 00000000 擴展節點偶數位
    // 00000001 擴展節點奇數位
    // 00000010 葉子節點偶數位
    // 00000011 葉子節點奇數位

    // apply terminator flag
    if base[0] >= 2 {
       //如果是葉子節點,末尾添加Hex標誌位16
        base = append(base, 16)
    }
    // apply odd flag
    //如果是偶數位,chop等於2,否則等於1
    chop := 2 - base[0]&1
    return base[chop:]
}
// 將十六進位的bibbles轉成key bytes,這隻能用於偶數長度的key
func hexToKeybytes(hex []byte) []byte {
    if hasTerm(hex) {
        hex = hex[:len(hex)-1]
    }
    if len(hex)&1 != 0 {
        panic("can't convert hex key of odd length")
    }
    key := make([]byte, (len(hex)+1)/2)
    decodeNibbles(hex, key)
    return key
}


// 返回a和b的公共前綴的長度
func prefixLen(a, b []byte) int {
    var i, length = 0, len(a)
    if len(b) < length {
        length = len(b)
    }
    for ; i < length; i++ {
        if a[i] != b[i] {
            break
        }
    }
    return i
}


node.go

四種節點

node 介面分四種實現: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以帶有子節點。

type (
	fullNode struct {
		Children [17]node // 分支節點
		flags    nodeFlag
	}
	shortNode struct { //擴展節點
		Key   []byte
		Val   node //可能指向葉子節點,也可能指向分支節點。
		flags nodeFlag
	}
	hashNode  []byte
	valueNode []byte // 葉子節點值,但是該葉子節點最終還是會包裝在shortNode中
)

trie.go

Trie對象實現了MPT樹的所有功能,包括(key, value)對的增刪改查、計算默克爾哈希,以及將整個樹寫入資料庫中。

iterator.go

nodeIterator提供了遍歷樹內部所有結點的功能。其結構如下:此結構體是在trie.go定義的

type nodeIterator struct {
	trie.NodeIterator
	t   *odrTrie
	err error
}

裡面包含了一個介面NodeIterator,它的實現則是由iterator.go來提供的,其方法如下:

func (it *nodeIterator) Next(descend bool) bool 
func (it *nodeIterator) Hash() common.Hash 
func (it *nodeIterator) Parent() common.Hash 
func (it *nodeIterator) Leaf() bool 
func (it *nodeIterator) LeafKey() []byte 
func (it *nodeIterator) LeafBlob() []byte 
func (it *nodeIterator) LeafProof() [][]byte 
func (it *nodeIterator) Path() []byte {}
func (it *nodeIterator) seek(prefix []byte) error 
func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, error) 
func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) 
func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) 
func (it *nodeIterator) pop() 

NodeIterator的核心是Next方法,每調用一次這個方法,NodeIterator對象代表的當前節點就會更新至下一個節點,當所有結點遍歷結束,Next方法返回false

生成NodeIterator結口的方法有以下3種:

①:Trie.NodeIterator(start []byte)

通過start參數指定從哪個路徑開始遍歷,如果為nil則從頭到尾按順序遍歷。

②:NewDifferenceIterator(a, b NodeIterator)

當調用NewDifferenceIterator(a, b NodeIterator)後,生成的NodeIterator只遍歷存在於 b 但不存在於 a 中的結點。

③:NewUnionIterator(iters []NodeIterator)

當調用NewUnionIterator(its []NodeIterator)後,生成的NodeIterator遍歷的結點是所有傳入的結點的合集。

database.go

Databasetrie模組對真正資料庫的快取層,其目的是對快取的節點進行引用計數,從而實現區塊的修剪功能。主要方法如下:

func NewDatabase(diskdb ethdb.KeyValueStore) *Database
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database 
func (db *Database) DiskDB() ethdb.KeyValueReader
func (db *Database) InsertBlob(hash common.Hash, blob []byte)
func (db *Database) insert(hash common.Hash, blob []byte, node node)
func (db *Database) insertPreimage(hash common.Hash, preimage []byte)
func (db *Database) node(hash common.Hash) node
func (db *Database) Node(hash common.Hash) ([]byte, error)
func (db *Database) preimage(hash common.Hash) ([]byte, error)
func (db *Database) secureKey(key []byte) []byte
func (db *Database) Nodes() []common.Hash
func (db *Database) Reference(child common.Hash, parent common.Hash)
func (db *Database) Dereference(root common.Hash)
func (db *Database) dereference(child common.Hash, parent common.Hash)
func (db *Database) Cap(limit common.StorageSize) error
func (db *Database) Commit(node common.Hash, report bool) error

security_trie.go

可以理解為加密了的trie的實現,ecurity_trie包裝了一下trie樹, 所有的key都轉換成keccak256演算法計算的hash值。同時在資料庫裡面存儲hash值對應的原始的key
但是官方在程式碼里也注釋了,這個程式碼不穩定,除了測試用例,別的地方並沒有使用該程式碼。

proof.go

  • Prove():根據給定的key,在trie中,將滿足key中最大長度前綴的路徑上的節點都加入到proofDb(隊列中每個元素滿足:未編碼的hash以及對應rlp編碼後的節點)
  • VerifyProof():驗證proffDb中是否存在滿足輸入的hash,和對應key的節點,如果滿足,則返回rlp解碼後的該節點。

實現細節

Trie對象的增刪改查

①:Trie樹的初始化

如果root不為空,就通過resolveHash來載入整個Trie樹,如果為空,就新建一個Trie樹。

func New(root common.Hash, db *Database) (*Trie, error) {
	if db == nil {
		panic("trie.New called without a database")
	}
	trie := &Trie{
		db: db,
	}
	if root != (common.Hash{}) && root != emptyRoot {
		rootnode, err := trie.resolveHash(root[:], nil)
		if err != nil {
			return nil, err
		}
		trie.root = rootnode
	}
	return trie, nil
}

②:Trie樹的插入

首先Trie樹的插入是個遞歸調用的過程,它會從根開始找,一直找到合適的位置插入。

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error)

參數說明:

  • n: 當前要插入的節點
  • prefix: 當前已經處理完的key(節點共有的前綴)
  • key: 未處理完的部分key,完整的key = prefix + key
  • value:需要插入的值

返回值說明:

  • bool : 操作是否改變了Trie樹(dirty)
  • Node :插入完成後的子樹的根節點

接下來就是分別對shortNodefullNodehashNodenil 幾種情況進行說明。

2.1:節點為nil

空樹直接返回shortNode, 此時整顆樹的根就含有了一個shortNode節點。

case nil:
		return true, &shortNode{key, value, t.newFlag()}, nil

2.2 :節點為shortNode

  • 首先計算公共前綴,如果公共前綴就等於key,那麼說明這兩個key是一樣的,如果value也一樣的(dirty == false),那麼返回錯誤。

  • 如果沒有錯誤就更新shortNode的值然後返回

  • 如果公共前綴不完全匹配,那麼就需要把公共前綴提取出來形成一個獨立的節點(擴展節點),擴展節點後面連接一個branch節點,branch節點後面看情況連接兩個short節點。

  • 首先構建一個branch節點(branch := &fullNode{flags: t.newFlag()}),然後再branch節點的Children位置調用t.insert插入剩下的兩個short節點

matchlen := prefixLen(key, n.Key)
		if matchlen == len(n.Key) {
			dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
			if !dirty || err != nil {
				return false, n, err
			}
			return true, &shortNode{n.Key, nn, t.newFlag()}, nil
		}
		branch := &fullNode{flags: t.newFlag()}
		var err error
		_, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
		if err != nil {
			return false, nil, err
		}
		_, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
		if err != nil {
			return false, nil, err
		}
		if matchlen == 0 {
			return true, branch, nil
    }
		return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

2.3: 節點為fullNode

節點是fullNode(也就是分支節點),那麼直接往對應的孩子節點調用insert方法,然後把對應的孩子節點指向新生成的節點。

dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
		if !dirty || err != nil {
			return false, n, err
		}
		n = n.copy()
		n.flags = t.newFlag()
		n.Children[key[0]] = nn
		return true, n, nil

2.4: 節點為hashnode

暫時還在資料庫中的節點,先調用 t.resolveHash(n, prefix)來載入到記憶體,然後調用insert方法來插入。

rn, err := t.resolveHash(n, prefix)
		if err != nil {
			return false, nil, err
		}
		dirty, nn, err := t.insert(rn, prefix, key, value)
		if !dirty || err != nil {
			return false, rn, err
		}
		return true, nn, nil

③:Trie樹查詢值

其實就是根據輸入的hash,找到對應的葉子節點的數據。主要看TryGet方法。

參數:

  • origNode:當前查找的起始node位置
  • key:輸入要查找的數據的hash
  • pos:當前hash匹配到第幾位
func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
	switch n := (origNode).(type) {
	case nil: //表示當前trie是空樹
		return nil, nil, false, nil
	case valueNode: ////這就是我們要查找的葉子節點對應的數據
		return n, n, false, nil
	case *shortNode: ////在葉子節點或者擴展節點匹配
		if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
			return nil, n, false, nil
		}
		value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
		if err == nil && didResolve {
			n = n.copy()
			n.Val = newnode
		}
		return value, n, didResolve, err
	case *fullNode://在分支節點匹配
		value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
		if err == nil && didResolve {
			n = n.copy()
			n.Children[key[pos]] = newnode
		}
		return value, n, didResolve, err
	case hashNode: //說明當前節點是輕節點,需要從db中獲取
		child, err := t.resolveHash(n, key[:pos])
		if err != nil {
			return nil, n, true, err
		}
		value, newnode, _, err := t.tryGet(child, key, pos)
		return value, newnode, true, err
...
}

didResolve用於判斷trie樹是否會發生變化,tryGet()只是用來獲取數據的,當hashNodedb中獲取該node值後需要更新現有的trie,didResolve就會發生變化。其他就是基本的遞歸查找樹操作。

④:Trie樹更新值

更新值,其實就是調用insert方法進行操作。

到此Trie樹的增刪改查就講解的差不多了。

將節點寫入到Trie的記憶體資料庫

如果要把節點寫入到記憶體資料庫,需要序列化,可以先去了解下以太坊的Rlp編碼。這部分工作由trie.Commit()完成,當trie.Commit(nil),會執行序列化和快取等操作,序列化之後是使用的Compact Encoding進行編碼,從而達到節省空間的目的。

func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
	if t.db == nil {
		panic("commit called on trie with nil database")
	}
	hash, cached, err := t.hashRoot(t.db, onleaf)
	if err != nil {
		return common.Hash{}, err
	}
	t.root = cached
	return common.BytesToHash(hash.(hashNode)), nil
}

上述程式碼大概講了這些:

  • 每次執行Commit(),該trie的cachegen就會加 1
  • Commit()方法返回的是trie.root所指向的nodehash(未編碼)
  • 其中的hashRoot()方法目的是返回trie.root所指向的node的hash以及每個節點都帶有各自hash的trie樹的root
//為每個node生成一個hash
func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
	if t.root == nil {
		return hashNode(emptyRoot.Bytes()), nil, nil
	}
	h := newHasher(onleaf)
	defer returnHasherToPool(h)
	return h.hash(t.root, db, true) //為每個節點生成一個未編碼的hash
}

hashRoot的核心方法就是 h.hash,它返回了頭節點的hash以及每個子節點都帶有hash的頭節點(Trie.root指向它),大致做了以下幾件事:

①:如果我們不存儲節點,而只是哈希,則從快取中獲取數據

if hash, dirty := n.cache(); hash != nil {
		if db == nil {
			return hash, n, nil
		}
		if !dirty {
			switch n.(type) {
			case *fullNode, *shortNode:
				return hash, hash, nil
			default:
				return hash, n, nil
			}
		}
	}

②:遞歸調用h.hashChildren,求出所有的子節點的hash值,再把原有的子節點替換成現在子節點的hash

2.1:如果節點是shortNode

首先把collapsed.Key從Hex Encoding 替換成 Compact Encoding, 然後遞歸調用hash方法計運算元節點的hashcache,從而把子節點替換成了子節點的hash

collapsed, cached := n.copy(), n.copy()
		collapsed.Key = hexToCompact(n.Key)
		cached.Key = common.CopyBytes(n.Key)

		if _, ok := n.Val.(valueNode); !ok {
			collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
			if err != nil {
				return original, original, err
			}
		}
		return collapsed, cached, nil

2.2:節點是fullNode

遍歷每個子節點,把子節點替換成子節點的Hash值,否則的化這個節點沒有children。直接返回。

		collapsed, cached := n.copy(), n.copy()

		for i := 0; i < 16; i++ {
			if n.Children[i] != nil {
				collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
				if err != nil {
					return original, original, err
				}
			}
		}
		cached.Children[16] = n.Children[16]
		return collapsed, cached, nil

③:存儲節點n的哈希值,如果我們指定了存儲層,它會寫對應的鍵/值對

store()方法主要就做了兩件事:

  • rlp序列化collapsed節點並將其插入db磁碟中
  • 生成當前節點的hash
  • 將節點哈希插入db

3.1:空數據或者hashNode,則不處理

if _, isHash := n.(hashNode); n == nil || isHash {
		return n, nil
	}

3.2:生成節點的RLP編碼

h.tmp.Reset()                                 // 快取初始化
	if err := rlp.Encode(&h.tmp, n); err != nil { //將當前node序列化
		panic("encode error: " + err.Error())
	}
	if len(h.tmp) < 32 && !force {
		return n, nil // Nodes smaller than 32 bytes are stored inside their parent 編碼後的node長度小於32,若force為true,則可確保所有節點都被編碼
	}
//長度過大的,則都將被新計算出來的hash取代
	hash, _ := n.cache() //取出當前節點的hash
	if hash == nil {
		hash = h.makeHashNode(h.tmp) //生成哈希node
	}

3.3:將Trie節點合併到中間記憶體快取中

hash := common.BytesToHash(hash)
		db.lock.Lock()
		db.insert(hash, h.tmp, n)
		db.lock.Unlock()
		// Track external references from account->storage trie
		//跟蹤帳戶->存儲Trie中的外部引用
		if h.onleaf != nil {
			switch n := n.(type) {
			case *shortNode:
				if child, ok := n.Val.(valueNode); ok {  //指向的是分支節點
					h.onleaf(child, hash) //用於統計當前節點的資訊,比如當前節點有幾個子節點,當前有效的節點數
				}
			case *fullNode:
				for i := 0; i < 16; i++ {
					if child, ok := n.Children[i].(valueNode); ok {
						h.onleaf(child, hash)
					}
				}
			}
		}

到此為止將節點寫入到Trie的記憶體資料庫就已經完成了。

如果覺得文章不錯可以關注公眾號:區塊鏈技術棧,詳細的所有以太坊源碼分析文章內容以及程式碼資料都在其中。

Trie樹快取機制

Trie樹的結構裡面有兩個參數, 一個是cachegen,一個是cachelimit。這兩個參數就是cache控制的參數。 Trie樹每一次調用Commit方法,會導致當前的cachegen增加1。

func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
   ...
    t.cachegen++
   ...
}

然後在Trie樹插入的時候,會把當前的cachegen存放到節點中。

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
            ....
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
func (t *Trie) newFlag() nodeFlag {
    return nodeFlag{dirty: true, gen: t.cachegen}
}

如果 trie.cachegen - node.cachegen > cachelimit,就可以把節點從記憶體裡面拿掉。 也就是說節點經過幾次Commit,都沒有修改,那麼就把節點從記憶體裡面幹掉。 只要trie路徑上新增或者刪除一個節點,整個路徑的節點都需要重新實例化,也就是節點中的nodeFlag被初始化了。都需要重新更新到db磁碟。

拿掉節點過程在 hasher.hash方法中, 這個方法是在commit的時候調用。如果方法的canUnload方法調用返回真,那麼就拿掉節點,如果只返回了hash節點,而沒有返回node節點,這樣節點就沒有引用,不久就會被gc清除掉。 節點被拿掉之後,會用一個hashNode節點來表示這個節點以及其子節點。 如果後續需要使用,可以通過方法把這個節點載入到記憶體裡面來。

func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
   	....
       // 從快取中卸載節點。它的所有子節點將具有較低或相等的快取世代號碼。
       cacheUnloadCounter.Inc(1)
  ...
}

參考&總結

這部分重要的內容也就上面講述的,主要集中在Trie上面,如果有不對的地方,可以及時指正哦。

//mindcarver.cn/about/

//github.com/blockchainGuide/blockchainguide