以太坊 — 交易池的特點 與 中斷恢復

作者:林冠宏 / 指尖下的幽靈。轉載者,請: 務必標明出處。

部落格://www.cnblogs.com/linguanh/

掘金://juejin.im/user/1785262612681997

GitHub : //github.com/af913337456/

出版的書籍:


目錄

  • 前序
  • 以太坊交易池知識點總結
  • 源碼探秘
    • 本地交易
      • 本地錢包地址的初始化
      • 載入本地交易
      • pool.journal.load
      • pool.AddLocals
      • 本地交易文件的更新
    • 遠程交易
      • P2P 通訊模組的初始化
      • 接收 P2P 消息
      • 添加遠程交易到交易池
  • 「 彩蛋 」

21年的第一篇文章,開源寫作6年。

最近比特幣以太坊的價格也已然起飛,現在一個 BTC 已能全款輛某斯拉 model 3汽車。離譜。

發布這篇文章:從區塊鏈技術研發者的角度,說說我的區塊鏈從業經歷和對它的理解 的時候,是去年,現在回首去看最後那段話,一語成讖


言歸正傳。

一般做數據池之類的開發。比如:訂單池,請求池…,傳統的服務端思想會引導我們直接向消息中間件想去。使用各類消息組件去實現,比如 RocketMQ,Redis,Kafka…

然而,在區塊鏈公鏈應用中,現已知的多條公鏈,每一條,都有交易池這麼一個功能模組,且,它們的程式碼實現都沒有引入消息中間件去實現。

早前在閱讀以太坊公鏈源碼的時候,我就對以太坊交易池這一塊的實現思想感到新穎,今天總結下,分享給大家看看,區塊鏈公鏈應用中不依賴消息中間件去實現交易池的做法及其特點。


以太坊交易池知識點總結 _(BTW:面試的時候可死記)

  1. 交易的分類:
    • 從本地文件存與不存的角度去看:
      1. 本地交易,若交易的發送者地址是配置變數指定的地址,則認為是本地交易:
        • 節點啟動的時候,可以在配置文件指定,不開啟本地交易的操作
      2. 遠程交易,不滿足 1 條件的交易。
    • 從記憶體存儲的角度去看:
      1. Queue,待進入 Pending 的交易,結構是 map[addr]TxList
      2. Pending,待進入打包隊列的交易,結構和 Queue 一樣,由 1 轉化而來。
  2. 交易的輸入(產生):
    • 程式啟動之初:
      1. 本地交易,從本地文件載入到記憶體,本地若沒,自然是 0 輸入;
      2. 遠程交易,由 P2P 通訊模組,接收到交易數據,存儲到記憶體。
    • 程式運行中:
      1. 自己接收交易的 RPC請求,SendTransaction 或 SendRawTransaction;
      2. 通過 P2P 通訊模組,接收其它節點的資訊,包含的動作有:
        1. 舊交易的移除;
        2. 新交易的增加。
  3. 交易的持久化策略:
    • 本地交易:
      1. 定時從 Pending 和 Queue 中選出本地交易存儲到本地文件
      2. 存儲方式,文件替換,先 new 一個,再 rename 一波;
      3. 注意第 2 點,文件的替換,意味著即是更新也是刪除操作;
      4. 編碼方式,rlp 編碼,不是 json。
    • 遠程交易:
      1. 不存,不進行持久化,總是依賴由其它節點 P2P 通訊同步過來。
  4. 中斷恢復:
    1. 本地交易,同上面 程式啟動之初 的操作;
    2. 遠程交易,沒有恢復,記憶體中的交易丟了就是丟了,不影響。即使當初正在打包,即使當前節點掛了,其它節點還在工作。

上面第 4 點,中斷恢復,對比於傳統後端服務的消息中間件,對消息的不丟失保障性,區塊鏈公鏈的做法,完全是靠分散式來維持的,單節點的數據丟失,可以從其它節點同步過來。所以,它們交易池的實現的實現,相對來說,更加靈活,編碼難點在消息同步部分。


下面進入枯燥的源碼分析階段,讀有餘力的讀者可以繼續

要看注釋。

本地交易

1. 本地錢包地址的初始化

源碼文件:tx_pool.go,config.Locals 由配置文件指定,是以太坊錢包地址數組。

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	for _, addr := range config.Locals { // 從配置文件添加 本地地址
		log.Info("Setting new local account", "address", addr)
		// 添加到 locals 變數裡面,後面會用它來過濾出一個地址是否是本地地址
		pool.locals.add(addr) 
	}
	...
}

2. 從本地文件,載入交易數據數據,即載入本地交易

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	pool.locals = newAccountSet(pool.signer)
	for _, addr := range config.Locals {
		log.Info("Setting new local account", "address", addr)
		pool.locals.add(addr)
	}
	...	
 	// 上面添加完了
	// If local transactions and journaling is enabled, load from disk
	if !config.NoLocals && config.Journal != "" { // 如果配置開啟了本地載入的需求
		pool.journal = newTxJournal(config.Journal)
   		// load 是載入函數,pool.AddLocals 是實際添加函數
		if err := pool.journal.load(pool.AddLocals); err != nil {
			log.Warn("Failed to load transaction journal", "err", err)
		}
		if err := pool.journal.rotate(pool.local()); err != nil {
			log.Warn("Failed to rotate transaction journal", "err", err)
		}
	}
	...
    go pool.loop() // 循環處理事件
}

3. pool.journal.load

源碼文件:tx_journal.go

func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
	// Skip the parsing if the journal file doesn't exist at all
	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
		return nil
	}
	// Open the journal for loading any past transactions
	input, err := os.Open(journal.path) // 打開文件,讀取流數據
	if err != nil {
		return err
	}
	...
	stream := rlp.NewStream(input, 0) // 使用 rlp 編碼演算法解碼數據
	...
	loadBatch := func(txs types.Transactions) {
		for _, err := range add(txs) { // 調用 add 函數,進行添加
			if err != nil {
				log.Debug("Failed to add journaled transaction", "err", err)
				dropped++
			}
		}
	}
	// loadBatch 在下面會被調用
	...
}

4. pool.AddLocals

pool.AddLocals 是實際的添加函數。內部的一系列調用後,最終到 tx_pool.add 函數。pool 的 queue 都是 map 結構,能根據相同 key 去重。

func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
	...
 	// 下面的 if,如果已在 pool.pending 裡面,那麼證明之前已經添加過在 queue 里
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		...
 		pool.journalTx(from, tx) // 內部調用 journal.insert
		return old != nil, nil
	}
	replaced, err = pool.enqueueTx(hash, tx) // 這裡,會添加到 pool.enqueue 裡面
	if err != nil {
		return false, err
	}
	pool.journalTx(from, tx) // 內部調用 journal.insert
	...
}

func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
	// 本地錢包地址,沒指定的話,就跳過
	if pool.journal == nil || !pool.locals.contains(from) {
		return
	}
 	// insert 會在造成重複添加,但是 load 出來的時候會根據 addr 去重
	if err := pool.journal.insert(tx); err != nil {
		log.Warn("Failed to journal local transaction", "err", err)
	}
}

截止到上面,本地交易已經被添加到 pool 的 queue 裡面了。

節點啟動之初,除了會從本地 load 交易到 queue 外,還會不停地監聽鏈的事件,比如接收交易,再 add 交易 到 queue 里。

5. 本地交易文件的更新 ( 插入 / 刪除 )

loop 是觸發的入口。除了主動的 journal.insert 達到了插入本地交易的目的之外。

下面的更新操作,也達到了包含插入的目的:以替換的手段,從文件刪除舊交易,存儲新交易到文件

func (pool *TxPool) loop() {
	...
	for {
		select {
		...
		// Handle local transaction journal rotation
 		// journal 定時器,定時執行下面的本地交易數據文件的更新 journal.rotate
		case <-journal.C:
			if pool.journal != nil {
				pool.mu.Lock()
				if err := pool.journal.rotate(pool.local()); err != nil {
					log.Warn("Failed to rotate local tx journal", "err", err)
				}
				pool.mu.Unlock()
			}
		}
	}
}

journal.rotate 的做法,使用文件替換的方式,來從 pool 的交易 pending 和 queue 中存儲 locals 錢包地址相關的交易到文件。注意,只存本地錢包地址的,其它的,不存。

//輸入
func (pool *TxPool) local() map[common.Address]types.Transactions {
	...
	for addr := range pool.locals.accounts {
		if pending := pool.pending[addr]; pending != nil {
 			// 添加 pending 的
			txs[addr] = append(txs[addr], pending.Flatten()...)
		}
		if queued := pool.queue[addr]; queued != nil {
 			// 添加 queue 的
			txs[addr] = append(txs[addr], queued.Flatten()...)
		}
	}
	return txs
}

// all 參數,來源於上面 local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
	...
	// journal.path+".new" 後綴 .new
	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
	if err != nil {
		return err
	}
	journaled := 0
	for _, txs := range all {
		for _, tx := range txs {
			if err = rlp.Encode(replacement, tx); err != nil {
				replacement.Close()
				return err
			}
		}
		journaled += len(txs)
	}
	replacement.Close()
 	// rename,重命名文件到原始的 path,達到更新,替換目的
	if err = os.Rename(journal.path+".new", journal.path); err != nil {
		return err
	}
	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
	if err != nil {
		return err
	}
	...
	return nil
}

遠程交易

P2P 通訊模組的初始化

源碼文件:eth/backend.go

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
	...
	if config.TxPool.Journal != "" {
		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
	}
 	// 初始化交易池
	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
	...
 	// 使用 交易池指針對象 作為參數初始化 protocolManager
	if eth.protocolManager, err = NewProtocolManager(
    		chainConfig, checkpoint, config.SyncMode, config.NetworkId, 
            	eth.eventMux, `eth.txPool`, eth.engine, 
                eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
		return nil, err
	}
	...
	return eth, nil
}

func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
	// 下面初始化 tx_fetcher,使用 txpool.AddRemotes 賦值給函數變數 addTxs
	manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}

接收 P2P 消息

源碼文件:eth/handler.go

func (pm *ProtocolManager) handleMsg(p *peer) error {
	...
    switch {
    ...
    // 接收到其它節點的交易數據
    case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
		...
 		// Enqueue 將交易添加到交易池
		pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)

    }
    ...
}
// tx_fetcher.go 文件
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
 	...
	errs := f.addTxs(txs) // 執行添加,這個函數其實就是 tx_pool.go 的 AddRemotes
	...
}

添加遠程交易到交易池

// tx_pool.go
// addTxs 內部就會把交易添加到 Pending 和 Queue 裡面
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
	return pool.addTxs(txs, false, false)
}

打完收工

更多以太坊的開發知識,見我的書籍:

《2.0-區塊鏈DApp開發:基於以太坊和比特幣公鏈》