死磕以太坊源碼分析之MPT樹-下
死磕以太坊源碼分析之MPT樹-下
文章以及資料請查看://github.com/blockchainGuide/
上篇主要介紹了以太坊中的MPT樹的原理,這篇主要會對MPT樹涉及的源碼進行拆解分析。trie
模組主要有以下幾個文件:
|-encoding.go 主要講編碼之間的轉換
|-hasher.go 實現了從某個結點開始計運算元樹的哈希的功能
|-node.go 定義了一個Trie樹中所有結點的類型和解析的程式碼
|-sync.go 實現了SyncTrie對象的定義和所有方法
|-iterator.go 定義了所有枚舉相關介面和實現
|-secure_trie.go 實現了SecureTrie對象
|-proof.go 為key構造一個merkle證明
|-trie.go Trie樹的增刪改查
|-database.go 對記憶體中的trie樹節點進行引用計數
實現概覽
encoding.go
這個主要是講三種編碼(KEYBYTES encoding
、HEX encoding
、COMPACT encoding
)的實現與轉換,trie
中全程都需要用到這些,該文件中主要實現了如下功能:
- hex編碼轉換為Compact編碼:
hexToCompact()
- Compact編碼轉換為hex編碼:
compactToHex()
- keybytes編碼轉換為Hex編碼:
keybytesToHex()
- hex編碼轉換為keybytes編碼:
hexToKeybytes()
- 獲取兩個位元組數組的公共前綴的長度:
prefixLen()
func hexToCompact(hex []byte) []byte {
terminator := byte(0)
if hasTerm(hex) { //檢查是否有結尾為0x10 => 16
terminator = 1 //有結束標記16說明是葉子節點
hex = hex[:len(hex)-1] //去除尾部標記
}
buf := make([]byte, len(hex)/2+1) // 位元組數組
buf[0] = terminator << 5 // 標誌byte為00000000或者00100000
//如果長度為奇數,添加奇數位標誌1,並把第一個nibble位元組放入buf[0]的低四位
if len(hex)&1 == 1 {
buf[0] |= 1 << 4 // 奇數標誌 00110000
buf[0] |= hex[0] // 第一個nibble包含在第一個位元組中 0011xxxx
hex = hex[1:]
}
//將兩個nibble位元組合併成一個位元組
decodeNibbles(hex, buf[1:])
return buf
//compact編碼轉化為Hex編碼
func compactToHex(compact []byte) []byte {
base := keybytesToHex(compact)
base = base[:len(base)-1]
// apply terminator flag
// base[0]包括四種情況
// 00000000 擴展節點偶數位
// 00000001 擴展節點奇數位
// 00000010 葉子節點偶數位
// 00000011 葉子節點奇數位
// apply terminator flag
if base[0] >= 2 {
//如果是葉子節點,末尾添加Hex標誌位16
base = append(base, 16)
}
// apply odd flag
//如果是偶數位,chop等於2,否則等於1
chop := 2 - base[0]&1
return base[chop:]
}
//compact編碼轉化為Hex編碼
func compactToHex(compact []byte) []byte {
base := keybytesToHex(compact)
base = base[:len(base)-1]
// apply terminator flag
// base[0]包括四種情況
// 00000000 擴展節點偶數位
// 00000001 擴展節點奇數位
// 00000010 葉子節點偶數位
// 00000011 葉子節點奇數位
// apply terminator flag
if base[0] >= 2 {
//如果是葉子節點,末尾添加Hex標誌位16
base = append(base, 16)
}
// apply odd flag
//如果是偶數位,chop等於2,否則等於1
chop := 2 - base[0]&1
return base[chop:]
}
// 將十六進位的bibbles轉成key bytes,這隻能用於偶數長度的key
func hexToKeybytes(hex []byte) []byte {
if hasTerm(hex) {
hex = hex[:len(hex)-1]
}
if len(hex)&1 != 0 {
panic("can't convert hex key of odd length")
}
key := make([]byte, (len(hex)+1)/2)
decodeNibbles(hex, key)
return key
}
// 返回a和b的公共前綴的長度
func prefixLen(a, b []byte) int {
var i, length = 0, len(a)
if len(b) < length {
length = len(b)
}
for ; i < length; i++ {
if a[i] != b[i] {
break
}
}
return i
}
node.go
四種節點
node 介面分四種實現: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以帶有子節點。
type (
fullNode struct {
Children [17]node // 分支節點
flags nodeFlag
}
shortNode struct { //擴展節點
Key []byte
Val node //可能指向葉子節點,也可能指向分支節點。
flags nodeFlag
}
hashNode []byte
valueNode []byte // 葉子節點值,但是該葉子節點最終還是會包裝在shortNode中
)
trie.go
Trie對象實現了MPT樹的所有功能,包括(key, value)對的增刪改查、計算默克爾哈希,以及將整個樹寫入資料庫中。
iterator.go
nodeIterator
提供了遍歷樹內部所有結點的功能。其結構如下:此結構體是在trie.go
定義的
type nodeIterator struct {
trie.NodeIterator
t *odrTrie
err error
}
裡面包含了一個介面NodeIterator
,它的實現則是由iterator.go
來提供的,其方法如下:
func (it *nodeIterator) Next(descend bool) bool
func (it *nodeIterator) Hash() common.Hash
func (it *nodeIterator) Parent() common.Hash
func (it *nodeIterator) Leaf() bool
func (it *nodeIterator) LeafKey() []byte
func (it *nodeIterator) LeafBlob() []byte
func (it *nodeIterator) LeafProof() [][]byte
func (it *nodeIterator) Path() []byte {}
func (it *nodeIterator) seek(prefix []byte) error
func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, error)
func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool)
func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte)
func (it *nodeIterator) pop()
NodeIterator
的核心是Next
方法,每調用一次這個方法,NodeIterator對象代表的當前節點就會更新至下一個節點,當所有結點遍歷結束,Next
方法返回false
。
生成NodeIterator結口的方法有以下3種:
①:Trie.NodeIterator(start []byte)
通過start
參數指定從哪個路徑開始遍歷,如果為nil
則從頭到尾按順序遍歷。
②:NewDifferenceIterator(a, b NodeIterator)
當調用NewDifferenceIterator(a, b NodeIterator)
後,生成的NodeIterator
只遍歷存在於 b 但不存在於 a 中的結點。
③:NewUnionIterator(iters []NodeIterator)
當調用NewUnionIterator(its []NodeIterator)
後,生成的NodeIterator
遍歷的結點是所有傳入的結點的合集。
database.go
Database
是trie
模組對真正資料庫的快取層,其目的是對快取的節點進行引用計數,從而實現區塊的修剪功能。主要方法如下:
func NewDatabase(diskdb ethdb.KeyValueStore) *Database
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database
func (db *Database) DiskDB() ethdb.KeyValueReader
func (db *Database) InsertBlob(hash common.Hash, blob []byte)
func (db *Database) insert(hash common.Hash, blob []byte, node node)
func (db *Database) insertPreimage(hash common.Hash, preimage []byte)
func (db *Database) node(hash common.Hash) node
func (db *Database) Node(hash common.Hash) ([]byte, error)
func (db *Database) preimage(hash common.Hash) ([]byte, error)
func (db *Database) secureKey(key []byte) []byte
func (db *Database) Nodes() []common.Hash
func (db *Database) Reference(child common.Hash, parent common.Hash)
func (db *Database) Dereference(root common.Hash)
func (db *Database) dereference(child common.Hash, parent common.Hash)
func (db *Database) Cap(limit common.StorageSize) error
func (db *Database) Commit(node common.Hash, report bool) error
security_trie.go
可以理解為加密了的trie
的實現,ecurity_trie
包裝了一下trie
樹, 所有的key
都轉換成keccak256
演算法計算的hash
值。同時在資料庫裡面存儲hash
值對應的原始的key
。
但是官方在程式碼里也注釋了,這個程式碼不穩定,除了測試用例,別的地方並沒有使用該程式碼。
proof.go
- Prove():根據給定的
key
,在trie
中,將滿足key
中最大長度前綴的路徑上的節點都加入到proofDb
(隊列中每個元素滿足:未編碼的hash以及對應rlp
編碼後的節點) - VerifyProof():驗證
proffDb
中是否存在滿足輸入的hash
,和對應key的節點,如果滿足,則返回rlp
解碼後的該節點。
實現細節
Trie對象的增刪改查
①:Trie樹的初始化
如果root
不為空,就通過resolveHash
來載入整個Trie
樹,如果為空,就新建一個Trie
樹。
func New(root common.Hash, db *Database) (*Trie, error) {
if db == nil {
panic("trie.New called without a database")
}
trie := &Trie{
db: db,
}
if root != (common.Hash{}) && root != emptyRoot {
rootnode, err := trie.resolveHash(root[:], nil)
if err != nil {
return nil, err
}
trie.root = rootnode
}
return trie, nil
}
②:Trie樹的插入
首先Trie樹的插入是個遞歸調用的過程,它會從根開始找,一直找到合適的位置插入。
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error)
參數說明:
- n: 當前要插入的節點
- prefix: 當前已經處理完的key(節點共有的前綴)
- key: 未處理完的部分key,完整的
key = prefix + key
- value:需要插入的值
返回值說明:
- bool : 操作是否改變了Trie樹(dirty)
- Node :插入完成後的子樹的根節點
接下來就是分別對shortNode
、fullNode
、hashNode
、nil
幾種情況進行說明。
2.1:節點為nil
空樹直接返回shortNode
, 此時整顆樹的根就含有了一個shortNode
節點。
case nil:
return true, &shortNode{key, value, t.newFlag()}, nil
2.2 :節點為shortNode
-
首先計算公共前綴,如果公共前綴就等於
key
,那麼說明這兩個key
是一樣的,如果value
也一樣的(dirty == false
),那麼返回錯誤。 -
如果沒有錯誤就更新
shortNode
的值然後返回 -
如果公共前綴不完全匹配,那麼就需要把公共前綴提取出來形成一個獨立的節點(擴展節點),擴展節點後面連接一個
branch
節點,branch
節點後面看情況連接兩個short
節點。 -
首先構建一個branch節點(branch := &fullNode{flags: t.newFlag()}),然後再branch節點的Children位置調用t.insert插入剩下的兩個short節點
matchlen := prefixLen(key, n.Key)
if matchlen == len(n.Key) {
dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
if !dirty || err != nil {
return false, n, err
}
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
branch := &fullNode{flags: t.newFlag()}
var err error
_, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
if err != nil {
return false, nil, err
}
_, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
if err != nil {
return false, nil, err
}
if matchlen == 0 {
return true, branch, nil
}
return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil
2.3: 節點為fullNode
節點是fullNode
(也就是分支節點),那麼直接往對應的孩子節點調用insert
方法,然後把對應的孩子節點指向新生成的節點。
dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
if !dirty || err != nil {
return false, n, err
}
n = n.copy()
n.flags = t.newFlag()
n.Children[key[0]] = nn
return true, n, nil
2.4: 節點為hashnode
暫時還在資料庫中的節點,先調用 t.resolveHash(n, prefix)
來載入到記憶體,然後調用insert
方法來插入。
rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
dirty, nn, err := t.insert(rn, prefix, key, value)
if !dirty || err != nil {
return false, rn, err
}
return true, nn, nil
③:Trie樹查詢值
其實就是根據輸入的hash
,找到對應的葉子節點的數據。主要看TryGet
方法。
參數:
origNode
:當前查找的起始node位置key
:輸入要查找的數據的hashpos
:當前hash匹配到第幾位
func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
switch n := (origNode).(type) {
case nil: //表示當前trie是空樹
return nil, nil, false, nil
case valueNode: ////這就是我們要查找的葉子節點對應的數據
return n, n, false, nil
case *shortNode: ////在葉子節點或者擴展節點匹配
if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
return nil, n, false, nil
}
value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
if err == nil && didResolve {
n = n.copy()
n.Val = newnode
}
return value, n, didResolve, err
case *fullNode://在分支節點匹配
value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
if err == nil && didResolve {
n = n.copy()
n.Children[key[pos]] = newnode
}
return value, n, didResolve, err
case hashNode: //說明當前節點是輕節點,需要從db中獲取
child, err := t.resolveHash(n, key[:pos])
if err != nil {
return nil, n, true, err
}
value, newnode, _, err := t.tryGet(child, key, pos)
return value, newnode, true, err
...
}
didResolve
用於判斷trie
樹是否會發生變化,tryGet()
只是用來獲取數據的,當hashNode
去db
中獲取該node
值後需要更新現有的trie,didResolve
就會發生變化。其他就是基本的遞歸查找樹操作。
④:Trie樹更新值
更新值,其實就是調用insert方法進行操作。
到此Trie樹的增刪改查就講解的差不多了。
將節點寫入到Trie的記憶體資料庫
如果要把節點寫入到記憶體資料庫,需要序列化,可以先去了解下以太坊的Rlp編碼。這部分工作由trie.Commit()
完成,當trie.Commit(nil)
,會執行序列化和快取等操作,序列化之後是使用的Compact Encoding
進行編碼,從而達到節省空間的目的。
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
if t.db == nil {
panic("commit called on trie with nil database")
}
hash, cached, err := t.hashRoot(t.db, onleaf)
if err != nil {
return common.Hash{}, err
}
t.root = cached
return common.BytesToHash(hash.(hashNode)), nil
}
上述程式碼大概講了這些:
- 每次執行
Commit()
,該trie的cachegen
就會加 1 Commit()
方法返回的是trie.root
所指向的node
的hash
(未編碼)- 其中的
hashRoot()
方法目的是返回trie.root所指向的node的hash
以及每個節點都帶有各自hash的trie樹的root
。
//為每個node生成一個hash
func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher(onleaf)
defer returnHasherToPool(h)
return h.hash(t.root, db, true) //為每個節點生成一個未編碼的hash
}
而hashRoot
的核心方法就是 h.hash
,它返回了頭節點的hash
以及每個子節點都帶有hash
的頭節點(Trie.root指向它),大致做了以下幾件事:
①:如果我們不存儲節點,而只是哈希,則從快取中獲取數據
if hash, dirty := n.cache(); hash != nil {
if db == nil {
return hash, n, nil
}
if !dirty {
switch n.(type) {
case *fullNode, *shortNode:
return hash, hash, nil
default:
return hash, n, nil
}
}
}
②:遞歸調用h.hashChildren
,求出所有的子節點的hash
值,再把原有的子節點替換成現在子節點的hash
值
2.1:如果節點是shortNode
首先把collapsed.Key從Hex Encoding
替換成 Compact Encoding
, 然後遞歸調用hash
方法計運算元節點的hash
和cache
,從而把子節點替換成了子節點的hash
值
collapsed, cached := n.copy(), n.copy()
collapsed.Key = hexToCompact(n.Key)
cached.Key = common.CopyBytes(n.Key)
if _, ok := n.Val.(valueNode); !ok {
collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
if err != nil {
return original, original, err
}
}
return collapsed, cached, nil
2.2:節點是fullNode
遍歷每個子節點,把子節點替換成子節點的Hash
值,否則的化這個節點沒有children
。直接返回。
collapsed, cached := n.copy(), n.copy()
for i := 0; i < 16; i++ {
if n.Children[i] != nil {
collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
if err != nil {
return original, original, err
}
}
}
cached.Children[16] = n.Children[16]
return collapsed, cached, nil
③:存儲節點n的哈希值,如果我們指定了存儲層,它會寫對應的鍵/值對
store()方法主要就做了兩件事:
rlp
序列化collapsed
節點並將其插入db磁碟中- 生成當前節點的
hash
- 將節點哈希插入
db
3.1:空數據或者hashNode,則不處理
if _, isHash := n.(hashNode); n == nil || isHash {
return n, nil
}
3.2:生成節點的RLP編碼
h.tmp.Reset() // 快取初始化
if err := rlp.Encode(&h.tmp, n); err != nil { //將當前node序列化
panic("encode error: " + err.Error())
}
if len(h.tmp) < 32 && !force {
return n, nil // Nodes smaller than 32 bytes are stored inside their parent 編碼後的node長度小於32,若force為true,則可確保所有節點都被編碼
}
//長度過大的,則都將被新計算出來的hash取代
hash, _ := n.cache() //取出當前節點的hash
if hash == nil {
hash = h.makeHashNode(h.tmp) //生成哈希node
}
3.3:將Trie節點合併到中間記憶體快取中
hash := common.BytesToHash(hash)
db.lock.Lock()
db.insert(hash, h.tmp, n)
db.lock.Unlock()
// Track external references from account->storage trie
//跟蹤帳戶->存儲Trie中的外部引用
if h.onleaf != nil {
switch n := n.(type) {
case *shortNode:
if child, ok := n.Val.(valueNode); ok { //指向的是分支節點
h.onleaf(child, hash) //用於統計當前節點的資訊,比如當前節點有幾個子節點,當前有效的節點數
}
case *fullNode:
for i := 0; i < 16; i++ {
if child, ok := n.Children[i].(valueNode); ok {
h.onleaf(child, hash)
}
}
}
}
到此為止將節點寫入到Trie
的記憶體資料庫就已經完成了。
如果覺得文章不錯可以關注公眾號:區塊鏈技術棧,詳細的所有以太坊源碼分析文章內容以及程式碼資料都在其中。
Trie樹快取機制
Trie
樹的結構裡面有兩個參數, 一個是cachegen
,一個是cachelimit
。這兩個參數就是cache
控制的參數。 Trie
樹每一次調用Commit
方法,會導致當前的cachegen
增加1。
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
...
t.cachegen++
...
}
然後在Trie
樹插入的時候,會把當前的cachegen
存放到節點中。
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
....
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
func (t *Trie) newFlag() nodeFlag {
return nodeFlag{dirty: true, gen: t.cachegen}
}
如果 trie.cachegen - node.cachegen > cachelimit
,就可以把節點從記憶體裡面拿掉。 也就是說節點經過幾次Commit
,都沒有修改,那麼就把節點從記憶體裡面幹掉。 只要trie
路徑上新增或者刪除一個節點,整個路徑的節點都需要重新實例化,也就是節點中的nodeFlag
被初始化了。都需要重新更新到db
磁碟。
拿掉節點過程在 hasher.hash
方法中, 這個方法是在commit
的時候調用。如果方法的canUnload
方法調用返回真,那麼就拿掉節點,如果只返回了hash
節點,而沒有返回node
節點,這樣節點就沒有引用,不久就會被gc清除掉。 節點被拿掉之後,會用一個hashNode
節點來表示這個節點以及其子節點。 如果後續需要使用,可以通過方法把這個節點載入到記憶體裡面來。
func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
....
// 從快取中卸載節點。它的所有子節點將具有較低或相等的快取世代號碼。
cacheUnloadCounter.Inc(1)
...
}
參考&總結
這部分重要的內容也就上面講述的,主要集中在Trie
上面,如果有不對的地方,可以及時指正哦。