Substrate源碼分析:交易流程
- 2019 年 10 月 4 日
- 筆記
從業務角度分析substrate源碼,梳理了交易流程,包括發起交易,廣播交易和打包交易。
1. 發起交易
交易的發起是通過客戶端的RPC
調用,這個主要是在author
模組中。
Substrate
中把外部交易稱做extrinsic
。RPC
方法名是author_submitExtrinsic
。- 外部交易以十六進位編碼形式,被導入交易池中。
具體程式碼(substrate/core/rpc/api/src/author/mod.rs
)如下:
1.1 author
模組
定義AuthorApi
pub trait AuthorApi<Hash, BlockHash> { #[rpc(name = "author_submitExtrinsic")] fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>; }
通過use
關鍵字,導入交易模組的相關功能
use transaction_pool::{ txpool::{ ChainApi as PoolChainApi, BlockHash, ExHash, IntoPoolError, Pool, watcher::Status, }, };
定義Author
結構體
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static { client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>, pool: Arc<Pool<P>>, subscriptions: Subscriptions, keystore: BareCryptoStorePtr, }
為Author
結構體實現AuthorApi
中的函數submit_extrinsic
。
impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where ... { ... fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> { let xt = Decode::decode(&mut &ext[..])?; let best_block_hash = self.client.info().chain.best_hash; self.pool .submit_one(&generic::BlockId::hash(best_block_hash), xt) .map_err(|e| e.into_pool_error() .map(Into::into) .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) ) } }
1.2 提交交易 pool
模組
我們通過Author
這個結構體,知道self.pool.submit_one
這個調用實現是在交易池模組的pool
這個模組中。
- 結構體
pool
,外部交易池。 - 方法
submit_one
,導入一條未驗證的外部交易到池中。 - 方法
submit_at
,導入一批未驗證的外部交易到池中。- 驗證外部交易
- 導入交易
- 發送導入交易的
notification
- 監聽導入交易處理結果
具體程式碼(/core/transaction-pool/graph/src/pool.rs
)如下:
pub trait ChainApi: Send + Sync { fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>; } pub struct Pool<B: ChainApi> { api: B, options: Options, listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>, pool: RwLock<base::BasePool< ExHash<B>, ExtrinsicFor<B>, >>, import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>, rotator: PoolRotator<ExHash<B>>, } impl<B: ChainApi> Pool<B> { pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> { Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?) } pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where T: IntoIterator<Item=ExtrinsicFor<B>> { ... let results = xts .into_iter() .map(|xt| -> Result<_, B::Error> { ... // 驗證外部交易 match self.api.validate_transaction(at, xt.clone())? { Ok(validity) => if validity.provides.is_empty() { Err(error::Error::NoTagsProvided.into()) } else { Ok(base::Transaction { data: xt, bytes, hash, priority: validity.priority, requires: validity.requires, provides: validity.provides, propagate: validity.propagate, valid_till: block_number .saturated_into::<u64>() .saturating_add(validity.longevity), }) }, ... } }) .map(|tx| { // 導入交易 let imported = self.pool.write().import(tx?)?; // 導入交易的notification if let base::Imported::Ready { .. } = imported { self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); } // 監聽導入交易處理結果 let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); Ok(imported.hash().clone()) }) .collect::<Vec<_>>(); ... } }
1.3 驗證交易
在交易被導入池中前,會調用函數self.api.validate_transaction
來驗證交易,它來自traitChainApi
。而這個函數的具體邏輯是由每個runtime
自己實現的。
node-template
中的程式碼(/substrate/node-template/runtime/src/lib.rs
)如下:
pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>; impl client_api::TaggedTransactionQueue<Block> for Runtime { fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity { Executive::validate_transaction(tx) } }
其中,Executive
是處理各種模組的調度器,其中有驗證交易的具體實現,程式碼(substrate/srml/executive/src/lib.rs
)如下:
// where 語句 UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>> pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity { let encoded_len = uxt.using_encoded(|d| d.len()); let xt = uxt.check(&Default::default())?; let dispatch_info = xt.get_dispatch_info(); // 使用turbofish操作符 // 指定泛型UnsignedValidator的方法validate的具體參數為ValidateUnsigned xt.validate::<UnsignedValidator>(dispatch_info, encoded_len) }
validate
方法的程式碼(/core/sr-primitives/src/generic/checked_extrinsic.rs
)如下:
fn validate<U: ValidateUnsigned<Call = Self::Call>>( &self, info: DispatchInfo, len: usize, ) -> TransactionValidity { if let Some((ref id, ref extra)) = self.signed { Extra::validate(extra, id, &self.function, info, len) } else { let valid = Extra::validate_unsigned(&self.function, info, len)?; Ok(valid.combine_with(U::validate_unsigned(&self.function)?)) } }
可以看出驗證交易的邏輯,就是檢查給定交易簽名的有效性。
1.4 導入交易 base_pool
模組
導入交易調用的是base_pool
模組中的函數self.pool.write().import
。將交易導入池中:
- 交易池中交易由兩部分組成:
Future
和Ready
。- 前者包含需要池中其他交易尚未提供的某些標記的交易。
- 後者包含滿足所有要求並準備包含在區塊中的交易。
- 將交易導入
ready
隊列- 交易需要通過此隊列中的事務滿足所有(準備好)標記。
- 返回由導入交易替換的交易。
具體實現程式碼(/core/transaction-pool/graph/src/base_pool.rs
)如下:
pub fn import(&mut self, tx: Transaction<Hash, Ex>, ) -> error::Result<Imported<Hash, Ex>> { if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone()))) } ... self.import_to_ready(tx) } fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> { ... loop { ... // 導入交易 let current_hash = tx.transaction.hash.clone(); match self.ready.import(tx) { Ok(mut replaced) => { if !first { promoted.push(current_hash); } removed.append(&mut replaced); }, ... } ... }
交易導入ready
隊列的程式碼()如下:
pub fn import( &mut self, tx: WaitingTransaction<Hash, Ex>, ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> { ... let replaced = self.replace_previous(&transaction)?; ... // 插入到 best if goes_to_best { self.best.insert(transaction.clone()); } // 插入到 Ready ready.insert(hash, ReadyTx { transaction, unlocks: vec![], requires_offset: 0, }); Ok(replaced) }
重要的幾個數據結構:
pub struct BasePool<Hash: hash::Hash + Eq, Ex> { future: FutureTransactions<Hash, Ex>, ready: ReadyTransactions<Hash, Ex>, recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS], recently_pruned_index: usize, }
futrue
:Future
交易ready
:Ready
交易recently_pruned
:存儲最近兩次修剪過的標籤。
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> { insertion_id: u64, provided_tags: HashMap<Tag, Hash>, ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>, best: BTreeSet<TransactionRef<Hash, Ex>>, }
ready
:已準備好的交易best
:使用的是有序集合BTreeSet
,存放將被打包到區塊中的交易(沒有任何前置交易)。
2. 廣播交易
當交易被添加到交易池中,網路模組的函數trigger_repropagate
會被調用。
- 發送消息
PropagateExtrinsics
- 傳播交易
propagate_extrinsics
- 判斷節點狀態,完全同步才接受交易並傳播,否則直接退出
- 發送交易數據包
具體程式碼(/substrate/core/service/src/lib.rs
)如下:
{ let network = Arc::downgrade(&network); let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |_| { if let Some(network) = network.upgrade() { network.trigger_repropagate(); } let status = transaction_pool_.status(); telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future ); Ok(()) }) .select(exit.clone()) .then(|_| Ok(())); let _ = to_spawn_tx.unbounded_send(Box::new(events)); }
以及trigger_repropagate
程式碼(substrate/core/network/src/service.rs
)如下:
pub fn trigger_repropagate(&self) { let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics); } ServerToWorkerMsg::PropagateExtrinsics => self.network_service.user_protocol_mut().propagate_extrinsics(),
傳播交易propagate_extrinsics
,程式碼(substrate/core/network/src/protocol.rs
)如下:
pub fn propagate_extrinsics( &mut self, ) { ... if self.sync.status().state != SyncState::Idle { return; } let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) .cloned() .unzip(); if !to_send.is_empty() { for hash in hashes { propagated_to .entry(hash) .or_insert_with(Vec::new) .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) } } self.transaction_pool.on_broadcasted(propagated_to); }
3. 打包交易
交易的打包是在區塊生成模組,程式碼(substrate/core/client/src/block_builder/api.rs
)如下:
pub trait BlockBuilder { /// Apply the given extrinsics. fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult; /// Finish the current block. #[renamed("finalise_block", 3)] fn finalize_block() -> <Block as BlockT>::Header; /// Generate inherent extrinsics. The inherent data will vary from chain to chain. fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>; /// Check that the inherents are valid. The inherent data will vary from chain to chain. fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult; /// Generate a random seed. fn random_seed() -> <Block as BlockT>::Hash; }
而這個函數的具體邏輯是由每個runtime
自己實現的。
node-template
中的程式碼(/substrate/node-template/runtime/src/lib.rs
)如下:
impl block_builder_api::BlockBuilder<Block> for Runtime { fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult { Executive::apply_extrinsic(extrinsic) } }
其中Executive
有打包交易的具體實現,程式碼(substrate/srml/executive/src/lib.rs
)如下:
pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult { let encoded = uxt.encode(); let encoded_len = encoded.len(); Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded)) }
這樣處理完交易,後面就是區塊出塊,以及進行最終性共識。