Substrate源码分析:交易流程
- 2019 年 10 月 4 日
- 筆記
从业务角度分析substrate源码,梳理了交易流程,包括发起交易,广播交易和打包交易。
1. 发起交易
交易的发起是通过客户端的RPC
调用,这个主要是在author
模块中。
Substrate
中把外部交易称做extrinsic
。RPC
方法名是author_submitExtrinsic
。- 外部交易以十六进制编码形式,被导入交易池中。
具体代码(substrate/core/rpc/api/src/author/mod.rs
)如下:
1.1 author
模块
定义AuthorApi
pub trait AuthorApi<Hash, BlockHash> { #[rpc(name = "author_submitExtrinsic")] fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>; }
通过use
关键字,导入交易模块的相关功能
use transaction_pool::{ txpool::{ ChainApi as PoolChainApi, BlockHash, ExHash, IntoPoolError, Pool, watcher::Status, }, };
定义Author
结构体
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static { client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>, pool: Arc<Pool<P>>, subscriptions: Subscriptions, keystore: BareCryptoStorePtr, }
为Author
结构体实现AuthorApi
中的函数submit_extrinsic
。
impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where ... { ... fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> { let xt = Decode::decode(&mut &ext[..])?; let best_block_hash = self.client.info().chain.best_hash; self.pool .submit_one(&generic::BlockId::hash(best_block_hash), xt) .map_err(|e| e.into_pool_error() .map(Into::into) .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()) ) } }
1.2 提交交易 pool
模块
我们通过Author
这个结构体,知道self.pool.submit_one
这个调用实现是在交易池模块的pool
这个模块中。
- 结构体
pool
,外部交易池。 - 方法
submit_one
,导入一条未验证的外部交易到池中。 - 方法
submit_at
,导入一批未验证的外部交易到池中。- 验证外部交易
- 导入交易
- 发送导入交易的
notification
- 监听导入交易处理结果
具体代码(/core/transaction-pool/graph/src/pool.rs
)如下:
pub trait ChainApi: Send + Sync { fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>; } pub struct Pool<B: ChainApi> { api: B, options: Options, listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>, pool: RwLock<base::BasePool< ExHash<B>, ExtrinsicFor<B>, >>, import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>, rotator: PoolRotator<ExHash<B>>, } impl<B: ChainApi> Pool<B> { pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> { Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?) } pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where T: IntoIterator<Item=ExtrinsicFor<B>> { ... let results = xts .into_iter() .map(|xt| -> Result<_, B::Error> { ... // 验证外部交易 match self.api.validate_transaction(at, xt.clone())? { Ok(validity) => if validity.provides.is_empty() { Err(error::Error::NoTagsProvided.into()) } else { Ok(base::Transaction { data: xt, bytes, hash, priority: validity.priority, requires: validity.requires, provides: validity.provides, propagate: validity.propagate, valid_till: block_number .saturated_into::<u64>() .saturating_add(validity.longevity), }) }, ... } }) .map(|tx| { // 导入交易 let imported = self.pool.write().import(tx?)?; // 导入交易的notification if let base::Imported::Ready { .. } = imported { self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); } // 监听导入交易处理结果 let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); Ok(imported.hash().clone()) }) .collect::<Vec<_>>(); ... } }
1.3 验证交易
在交易被导入池中前,会调用函数self.api.validate_transaction
来验证交易,它来自traitChainApi
。而这个函数的具体逻辑是由每个runtime
自己实现的。
node-template
中的代码(/substrate/node-template/runtime/src/lib.rs
)如下:
pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>; impl client_api::TaggedTransactionQueue<Block> for Runtime { fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity { Executive::validate_transaction(tx) } }
其中,Executive
是处理各种模块的调度器,其中有验证交易的具体实现,代码(substrate/srml/executive/src/lib.rs
)如下:
// where 语句 UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>> pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity { let encoded_len = uxt.using_encoded(|d| d.len()); let xt = uxt.check(&Default::default())?; let dispatch_info = xt.get_dispatch_info(); // 使用turbofish操作符 // 指定泛型UnsignedValidator的方法validate的具体参数为ValidateUnsigned xt.validate::<UnsignedValidator>(dispatch_info, encoded_len) }
validate
方法的代码(/core/sr-primitives/src/generic/checked_extrinsic.rs
)如下:
fn validate<U: ValidateUnsigned<Call = Self::Call>>( &self, info: DispatchInfo, len: usize, ) -> TransactionValidity { if let Some((ref id, ref extra)) = self.signed { Extra::validate(extra, id, &self.function, info, len) } else { let valid = Extra::validate_unsigned(&self.function, info, len)?; Ok(valid.combine_with(U::validate_unsigned(&self.function)?)) } }
可以看出验证交易的逻辑,就是检查给定交易签名的有效性。
1.4 导入交易 base_pool
模块
导入交易调用的是base_pool
模块中的函数self.pool.write().import
。将交易导入池中:
- 交易池中交易由两部分组成:
Future
和Ready
。- 前者包含需要池中其他交易尚未提供的某些标记的交易。
- 后者包含满足所有要求并准备包含在区块中的交易。
- 将交易导入
ready
队列- 交易需要通过此队列中的事务满足所有(准备好)标记。
- 返回由导入交易替换的交易。
具体实现代码(/core/transaction-pool/graph/src/base_pool.rs
)如下:
pub fn import(&mut self, tx: Transaction<Hash, Ex>, ) -> error::Result<Imported<Hash, Ex>> { if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone()))) } ... self.import_to_ready(tx) } fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> { ... loop { ... // 导入交易 let current_hash = tx.transaction.hash.clone(); match self.ready.import(tx) { Ok(mut replaced) => { if !first { promoted.push(current_hash); } removed.append(&mut replaced); }, ... } ... }
交易导入ready
队列的代码()如下:
pub fn import( &mut self, tx: WaitingTransaction<Hash, Ex>, ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> { ... let replaced = self.replace_previous(&transaction)?; ... // 插入到 best if goes_to_best { self.best.insert(transaction.clone()); } // 插入到 Ready ready.insert(hash, ReadyTx { transaction, unlocks: vec![], requires_offset: 0, }); Ok(replaced) }
重要的几个数据结构:
pub struct BasePool<Hash: hash::Hash + Eq, Ex> { future: FutureTransactions<Hash, Ex>, ready: ReadyTransactions<Hash, Ex>, recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS], recently_pruned_index: usize, }
futrue
:Future
交易ready
:Ready
交易recently_pruned
:存储最近两次修剪过的标签。
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> { insertion_id: u64, provided_tags: HashMap<Tag, Hash>, ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>, best: BTreeSet<TransactionRef<Hash, Ex>>, }
ready
:已准备好的交易best
:使用的是有序集合BTreeSet
,存放将被打包到区块中的交易(没有任何前置交易)。
2. 广播交易
当交易被添加到交易池中,网络模块的函数trigger_repropagate
会被调用。
- 发送消息
PropagateExtrinsics
- 传播交易
propagate_extrinsics
- 判断节点状态,完全同步才接受交易并传播,否则直接退出
- 发送交易数据包
具体代码(/substrate/core/service/src/lib.rs
)如下:
{ let network = Arc::downgrade(&network); let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |_| { if let Some(network) = network.upgrade() { network.trigger_repropagate(); } let status = transaction_pool_.status(); telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future ); Ok(()) }) .select(exit.clone()) .then(|_| Ok(())); let _ = to_spawn_tx.unbounded_send(Box::new(events)); }
以及trigger_repropagate
代码(substrate/core/network/src/service.rs
)如下:
pub fn trigger_repropagate(&self) { let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics); } ServerToWorkerMsg::PropagateExtrinsics => self.network_service.user_protocol_mut().propagate_extrinsics(),
传播交易propagate_extrinsics
,代码(substrate/core/network/src/protocol.rs
)如下:
pub fn propagate_extrinsics( &mut self, ) { ... if self.sync.status().state != SyncState::Idle { return; } let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) .cloned() .unzip(); if !to_send.is_empty() { for hash in hashes { propagated_to .entry(hash) .or_insert_with(Vec::new) .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) } } self.transaction_pool.on_broadcasted(propagated_to); }
3. 打包交易
交易的打包是在区块生成模块,代码(substrate/core/client/src/block_builder/api.rs
)如下:
pub trait BlockBuilder { /// Apply the given extrinsics. fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult; /// Finish the current block. #[renamed("finalise_block", 3)] fn finalize_block() -> <Block as BlockT>::Header; /// Generate inherent extrinsics. The inherent data will vary from chain to chain. fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>; /// Check that the inherents are valid. The inherent data will vary from chain to chain. fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult; /// Generate a random seed. fn random_seed() -> <Block as BlockT>::Hash; }
而这个函数的具体逻辑是由每个runtime
自己实现的。
node-template
中的代码(/substrate/node-template/runtime/src/lib.rs
)如下:
impl block_builder_api::BlockBuilder<Block> for Runtime { fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult { Executive::apply_extrinsic(extrinsic) } }
其中Executive
有打包交易的具体实现,代码(substrate/srml/executive/src/lib.rs
)如下:
pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult { let encoded = uxt.encode(); let encoded_len = encoded.len(); Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded)) }
这样处理完交易,后面就是区块出块,以及进行最终性共识。