Substrate源码分析:交易流程

  • 2019 年 10 月 4 日
  • 笔记

从业务角度分析substrate源码,梳理了交易流程,包括发起交易,广播交易和打包交易。

1. 发起交易

交易的发起是通过客户端的RPC调用,这个主要是在author模块中。

  • Substrate中把外部交易称做extrinsic
  • RPC方法名是author_submitExtrinsic
  • 外部交易以十六进制编码形式,被导入交易池中。

具体代码(substrate/core/rpc/api/src/author/mod.rs)如下:

1.1 author模块

定义AuthorApi

pub trait AuthorApi<Hash, BlockHash> {      #[rpc(name = "author_submitExtrinsic")]      fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;  }

通过use关键字,导入交易模块的相关功能

use transaction_pool::{      txpool::{          ChainApi as PoolChainApi,          BlockHash,          ExHash,          IntoPoolError,          Pool,          watcher::Status,      },  };

定义Author结构体

pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {      client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,      pool: Arc<Pool<P>>,      subscriptions: Subscriptions,      keystore: BareCryptoStorePtr,  }

Author结构体实现AuthorApi中的函数submit_extrinsic

impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where  ...  {      ...      fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {          let xt = Decode::decode(&mut &ext[..])?;          let best_block_hash = self.client.info().chain.best_hash;          self.pool              .submit_one(&generic::BlockId::hash(best_block_hash), xt)              .map_err(|e| e.into_pool_error()                  .map(Into::into)                  .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())              )      }  }

1.2 提交交易 pool模块

我们通过Author这个结构体,知道self.pool.submit_one这个调用实现是在交易池模块的pool这个模块中。

  • 结构体pool,外部交易池。
  • 方法submit_one,导入一条未验证的外部交易到池中。
  • 方法submit_at,导入一批未验证的外部交易到池中。
    • 验证外部交易
    • 导入交易
    • 发送导入交易的notification
    • 监听导入交易处理结果

具体代码(/core/transaction-pool/graph/src/pool.rs)如下:

pub trait ChainApi: Send + Sync {      fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;  }    pub struct Pool<B: ChainApi> {      api: B,      options: Options,      listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,      pool: RwLock<base::BasePool<          ExHash<B>,          ExtrinsicFor<B>,      >>,      import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,      rotator: PoolRotator<ExHash<B>>,  }    impl<B: ChainApi> Pool<B> {      pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {          Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)      }        pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where          T: IntoIterator<Item=ExtrinsicFor<B>>      {          ...          let results = xts              .into_iter()              .map(|xt| -> Result<_, B::Error> {          ...              // 验证外部交易              match self.api.validate_transaction(at, xt.clone())? {                      Ok(validity) => if validity.provides.is_empty() {                          Err(error::Error::NoTagsProvided.into())                      } else {                          Ok(base::Transaction {                              data: xt,                              bytes,                              hash,                              priority: validity.priority,                              requires: validity.requires,                              provides: validity.provides,                              propagate: validity.propagate,                              valid_till: block_number                                  .saturated_into::<u64>()                                  .saturating_add(validity.longevity),                          })                      },                      ...                  }              })              .map(|tx| { // 导入交易                  let imported = self.pool.write().import(tx?)?;                  // 导入交易的notification                  if let base::Imported::Ready { .. } = imported {                      self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());                  }                  // 监听导入交易处理结果                  let mut listener = self.listener.write();                  fire_events(&mut *listener, &imported);                  Ok(imported.hash().clone())              })              .collect::<Vec<_>>();          ...      }  }

1.3 验证交易

在交易被导入池中前,会调用函数self.api.validate_transaction来验证交易,它来自traitChainApi。而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>;    impl client_api::TaggedTransactionQueue<Block> for Runtime {      fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity {          Executive::validate_transaction(tx)      }  }

其中,Executive是处理各种模块的调度器,其中有验证交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

// where 语句  UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>>    pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity {      let encoded_len = uxt.using_encoded(|d| d.len());      let xt = uxt.check(&Default::default())?;        let dispatch_info = xt.get_dispatch_info();      // 使用turbofish操作符      // 指定泛型UnsignedValidator的方法validate的具体参数为ValidateUnsigned      xt.validate::<UnsignedValidator>(dispatch_info, encoded_len)  }

validate方法的代码(/core/sr-primitives/src/generic/checked_extrinsic.rs)如下:

fn validate<U: ValidateUnsigned<Call = Self::Call>>(      &self,      info: DispatchInfo,      len: usize,  ) -> TransactionValidity {      if let Some((ref id, ref extra)) = self.signed {          Extra::validate(extra, id, &self.function, info, len)      } else {          let valid = Extra::validate_unsigned(&self.function, info, len)?;          Ok(valid.combine_with(U::validate_unsigned(&self.function)?))      }  }

可以看出验证交易的逻辑,就是检查给定交易签名的有效性。

1.4 导入交易 base_pool模块

导入交易调用的是base_pool模块中的函数self.pool.write().import。将交易导入池中:

  • 交易池中交易由两部分组成:FutureReady
    • 前者包含需要池中其他交易尚未提供的某些标记的交易。
    • 后者包含满足所有要求并准备包含在区块中的交易。
  • 将交易导入ready队列
    • 交易需要通过此队列中的事务满足所有(准备好)标记。
    • 返回由导入交易替换的交易。

具体实现代码(/core/transaction-pool/graph/src/base_pool.rs)如下:

pub fn import(&mut self,      tx: Transaction<Hash, Ex>,  ) -> error::Result<Imported<Hash, Ex>> {      if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {          return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))      }      ...      self.import_to_ready(tx)  }    fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {      ...      loop {          ...          // 导入交易          let current_hash = tx.transaction.hash.clone();          match self.ready.import(tx) {              Ok(mut replaced) => {                  if !first {                      promoted.push(current_hash);                  }                  removed.append(&mut replaced);              },              ...      }      ...  }

交易导入ready队列的代码()如下:

pub fn import(      &mut self,      tx: WaitingTransaction<Hash, Ex>,  ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {      ...      let replaced = self.replace_previous(&transaction)?;      ...      // 插入到 best      if goes_to_best {          self.best.insert(transaction.clone());      }      // 插入到 Ready      ready.insert(hash, ReadyTx {          transaction,          unlocks: vec![],          requires_offset: 0,      });      Ok(replaced)  }

重要的几个数据结构:

pub struct BasePool<Hash: hash::Hash + Eq, Ex> {      future: FutureTransactions<Hash, Ex>,      ready: ReadyTransactions<Hash, Ex>,      recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],      recently_pruned_index: usize,  }
  • futrueFuture交易
  • readyReady交易
  • recently_pruned:存储最近两次修剪过的标签。
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {      insertion_id: u64,      provided_tags: HashMap<Tag, Hash>,      ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,      best: BTreeSet<TransactionRef<Hash, Ex>>,  }
  • ready:已准备好的交易
  • best:使用的是有序集合BTreeSet,存放将被打包到区块中的交易(没有任何前置交易)。

2. 广播交易

当交易被添加到交易池中,网络模块的函数trigger_repropagate会被调用。

  • 发送消息PropagateExtrinsics
  • 传播交易propagate_extrinsics
    • 判断节点状态,完全同步才接受交易并传播,否则直接退出
    • 发送交易数据包

具体代码(/substrate/core/service/src/lib.rs)如下:

{      let network = Arc::downgrade(&network);      let transaction_pool_ = transaction_pool.clone();      let events = transaction_pool.import_notification_stream()          .map(|v| Ok::<_, ()>(v)).compat()          .for_each(move |_| {              if let Some(network) = network.upgrade() {                  network.trigger_repropagate();              }              let status = transaction_pool_.status();              telemetry!(SUBSTRATE_INFO; "txpool.import";                  "ready" => status.ready,                  "future" => status.future              );              Ok(())          })          .select(exit.clone())          .then(|_| Ok(()));        let _ = to_spawn_tx.unbounded_send(Box::new(events));  }

以及trigger_repropagate代码(substrate/core/network/src/service.rs)如下:

pub fn trigger_repropagate(&self) {      let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);  }    ServerToWorkerMsg::PropagateExtrinsics =>      self.network_service.user_protocol_mut().propagate_extrinsics(),

传播交易propagate_extrinsics,代码(substrate/core/network/src/protocol.rs)如下:

pub fn propagate_extrinsics(      &mut self,  ) {      ...      if self.sync.status().state != SyncState::Idle {          return;      }        let extrinsics = self.transaction_pool.transactions();      let mut propagated_to = HashMap::new();      for (who, peer) in self.context_data.peers.iter_mut() {          let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics              .iter()              .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))              .cloned()              .unzip();            if !to_send.is_empty() {              for hash in hashes {                  propagated_to                      .entry(hash)                      .or_insert_with(Vec::new)                      .push(who.to_base58());              }              trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);              self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))          }      }        self.transaction_pool.on_broadcasted(propagated_to);  }

3. 打包交易

交易的打包是在区块生成模块,代码(substrate/core/client/src/block_builder/api.rs)如下:

pub trait BlockBuilder {      /// Apply the given extrinsics.      fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult;      /// Finish the current block.      #[renamed("finalise_block", 3)]      fn finalize_block() -> <Block as BlockT>::Header;      /// Generate inherent extrinsics. The inherent data will vary from chain to chain.      fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>;      /// Check that the inherents are valid. The inherent data will vary from chain to chain.      fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult;      /// Generate a random seed.      fn random_seed() -> <Block as BlockT>::Hash;  }

而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

impl block_builder_api::BlockBuilder<Block> for Runtime {      fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult {          Executive::apply_extrinsic(extrinsic)      }  }

其中Executive有打包交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult {      let encoded = uxt.encode();      let encoded_len = encoded.len();      Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded))  }

这样处理完交易,后面就是区块出块,以及进行最终性共识。