Substrate源碼分析:交易流程

  • 2019 年 10 月 4 日
  • 筆記

從業務角度分析substrate源碼,梳理了交易流程,包括發起交易,廣播交易和打包交易。

1. 發起交易

交易的發起是通過客戶端的RPC調用,這個主要是在author模組中。

  • Substrate中把外部交易稱做extrinsic
  • RPC方法名是author_submitExtrinsic
  • 外部交易以十六進位編碼形式,被導入交易池中。

具體程式碼(substrate/core/rpc/api/src/author/mod.rs)如下:

1.1 author模組

定義AuthorApi

pub trait AuthorApi<Hash, BlockHash> {      #[rpc(name = "author_submitExtrinsic")]      fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;  }

通過use關鍵字,導入交易模組的相關功能

use transaction_pool::{      txpool::{          ChainApi as PoolChainApi,          BlockHash,          ExHash,          IntoPoolError,          Pool,          watcher::Status,      },  };

定義Author結構體

pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {      client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,      pool: Arc<Pool<P>>,      subscriptions: Subscriptions,      keystore: BareCryptoStorePtr,  }

Author結構體實現AuthorApi中的函數submit_extrinsic

impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where  ...  {      ...      fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {          let xt = Decode::decode(&mut &ext[..])?;          let best_block_hash = self.client.info().chain.best_hash;          self.pool              .submit_one(&generic::BlockId::hash(best_block_hash), xt)              .map_err(|e| e.into_pool_error()                  .map(Into::into)                  .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())              )      }  }

1.2 提交交易 pool模組

我們通過Author這個結構體,知道self.pool.submit_one這個調用實現是在交易池模組的pool這個模組中。

  • 結構體pool,外部交易池。
  • 方法submit_one,導入一條未驗證的外部交易到池中。
  • 方法submit_at,導入一批未驗證的外部交易到池中。
    • 驗證外部交易
    • 導入交易
    • 發送導入交易的notification
    • 監聽導入交易處理結果

具體程式碼(/core/transaction-pool/graph/src/pool.rs)如下:

pub trait ChainApi: Send + Sync {      fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;  }    pub struct Pool<B: ChainApi> {      api: B,      options: Options,      listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,      pool: RwLock<base::BasePool<          ExHash<B>,          ExtrinsicFor<B>,      >>,      import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,      rotator: PoolRotator<ExHash<B>>,  }    impl<B: ChainApi> Pool<B> {      pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {          Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)      }        pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where          T: IntoIterator<Item=ExtrinsicFor<B>>      {          ...          let results = xts              .into_iter()              .map(|xt| -> Result<_, B::Error> {          ...              // 驗證外部交易              match self.api.validate_transaction(at, xt.clone())? {                      Ok(validity) => if validity.provides.is_empty() {                          Err(error::Error::NoTagsProvided.into())                      } else {                          Ok(base::Transaction {                              data: xt,                              bytes,                              hash,                              priority: validity.priority,                              requires: validity.requires,                              provides: validity.provides,                              propagate: validity.propagate,                              valid_till: block_number                                  .saturated_into::<u64>()                                  .saturating_add(validity.longevity),                          })                      },                      ...                  }              })              .map(|tx| { // 導入交易                  let imported = self.pool.write().import(tx?)?;                  // 導入交易的notification                  if let base::Imported::Ready { .. } = imported {                      self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());                  }                  // 監聽導入交易處理結果                  let mut listener = self.listener.write();                  fire_events(&mut *listener, &imported);                  Ok(imported.hash().clone())              })              .collect::<Vec<_>>();          ...      }  }

1.3 驗證交易

在交易被導入池中前,會調用函數self.api.validate_transaction來驗證交易,它來自traitChainApi。而這個函數的具體邏輯是由每個runtime自己實現的。

node-template中的程式碼(/substrate/node-template/runtime/src/lib.rs)如下:

pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>;    impl client_api::TaggedTransactionQueue<Block> for Runtime {      fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity {          Executive::validate_transaction(tx)      }  }

其中,Executive是處理各種模組的調度器,其中有驗證交易的具體實現,程式碼(substrate/srml/executive/src/lib.rs)如下:

// where 語句  UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>>    pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity {      let encoded_len = uxt.using_encoded(|d| d.len());      let xt = uxt.check(&Default::default())?;        let dispatch_info = xt.get_dispatch_info();      // 使用turbofish操作符      // 指定泛型UnsignedValidator的方法validate的具體參數為ValidateUnsigned      xt.validate::<UnsignedValidator>(dispatch_info, encoded_len)  }

validate方法的程式碼(/core/sr-primitives/src/generic/checked_extrinsic.rs)如下:

fn validate<U: ValidateUnsigned<Call = Self::Call>>(      &self,      info: DispatchInfo,      len: usize,  ) -> TransactionValidity {      if let Some((ref id, ref extra)) = self.signed {          Extra::validate(extra, id, &self.function, info, len)      } else {          let valid = Extra::validate_unsigned(&self.function, info, len)?;          Ok(valid.combine_with(U::validate_unsigned(&self.function)?))      }  }

可以看出驗證交易的邏輯,就是檢查給定交易簽名的有效性。

1.4 導入交易 base_pool模組

導入交易調用的是base_pool模組中的函數self.pool.write().import。將交易導入池中:

  • 交易池中交易由兩部分組成:FutureReady
    • 前者包含需要池中其他交易尚未提供的某些標記的交易。
    • 後者包含滿足所有要求並準備包含在區塊中的交易。
  • 將交易導入ready隊列
    • 交易需要通過此隊列中的事務滿足所有(準備好)標記。
    • 返回由導入交易替換的交易。

具體實現程式碼(/core/transaction-pool/graph/src/base_pool.rs)如下:

pub fn import(&mut self,      tx: Transaction<Hash, Ex>,  ) -> error::Result<Imported<Hash, Ex>> {      if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {          return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))      }      ...      self.import_to_ready(tx)  }    fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {      ...      loop {          ...          // 導入交易          let current_hash = tx.transaction.hash.clone();          match self.ready.import(tx) {              Ok(mut replaced) => {                  if !first {                      promoted.push(current_hash);                  }                  removed.append(&mut replaced);              },              ...      }      ...  }

交易導入ready隊列的程式碼()如下:

pub fn import(      &mut self,      tx: WaitingTransaction<Hash, Ex>,  ) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {      ...      let replaced = self.replace_previous(&transaction)?;      ...      // 插入到 best      if goes_to_best {          self.best.insert(transaction.clone());      }      // 插入到 Ready      ready.insert(hash, ReadyTx {          transaction,          unlocks: vec![],          requires_offset: 0,      });      Ok(replaced)  }

重要的幾個數據結構:

pub struct BasePool<Hash: hash::Hash + Eq, Ex> {      future: FutureTransactions<Hash, Ex>,      ready: ReadyTransactions<Hash, Ex>,      recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],      recently_pruned_index: usize,  }
  • futrueFuture交易
  • readyReady交易
  • recently_pruned:存儲最近兩次修剪過的標籤。
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {      insertion_id: u64,      provided_tags: HashMap<Tag, Hash>,      ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,      best: BTreeSet<TransactionRef<Hash, Ex>>,  }
  • ready:已準備好的交易
  • best:使用的是有序集合BTreeSet,存放將被打包到區塊中的交易(沒有任何前置交易)。

2. 廣播交易

當交易被添加到交易池中,網路模組的函數trigger_repropagate會被調用。

  • 發送消息PropagateExtrinsics
  • 傳播交易propagate_extrinsics
    • 判斷節點狀態,完全同步才接受交易並傳播,否則直接退出
    • 發送交易數據包

具體程式碼(/substrate/core/service/src/lib.rs)如下:

{      let network = Arc::downgrade(&network);      let transaction_pool_ = transaction_pool.clone();      let events = transaction_pool.import_notification_stream()          .map(|v| Ok::<_, ()>(v)).compat()          .for_each(move |_| {              if let Some(network) = network.upgrade() {                  network.trigger_repropagate();              }              let status = transaction_pool_.status();              telemetry!(SUBSTRATE_INFO; "txpool.import";                  "ready" => status.ready,                  "future" => status.future              );              Ok(())          })          .select(exit.clone())          .then(|_| Ok(()));        let _ = to_spawn_tx.unbounded_send(Box::new(events));  }

以及trigger_repropagate程式碼(substrate/core/network/src/service.rs)如下:

pub fn trigger_repropagate(&self) {      let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);  }    ServerToWorkerMsg::PropagateExtrinsics =>      self.network_service.user_protocol_mut().propagate_extrinsics(),

傳播交易propagate_extrinsics,程式碼(substrate/core/network/src/protocol.rs)如下:

pub fn propagate_extrinsics(      &mut self,  ) {      ...      if self.sync.status().state != SyncState::Idle {          return;      }        let extrinsics = self.transaction_pool.transactions();      let mut propagated_to = HashMap::new();      for (who, peer) in self.context_data.peers.iter_mut() {          let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics              .iter()              .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))              .cloned()              .unzip();            if !to_send.is_empty() {              for hash in hashes {                  propagated_to                      .entry(hash)                      .or_insert_with(Vec::new)                      .push(who.to_base58());              }              trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);              self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))          }      }        self.transaction_pool.on_broadcasted(propagated_to);  }

3. 打包交易

交易的打包是在區塊生成模組,程式碼(substrate/core/client/src/block_builder/api.rs)如下:

pub trait BlockBuilder {      /// Apply the given extrinsics.      fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult;      /// Finish the current block.      #[renamed("finalise_block", 3)]      fn finalize_block() -> <Block as BlockT>::Header;      /// Generate inherent extrinsics. The inherent data will vary from chain to chain.      fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>;      /// Check that the inherents are valid. The inherent data will vary from chain to chain.      fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult;      /// Generate a random seed.      fn random_seed() -> <Block as BlockT>::Hash;  }

而這個函數的具體邏輯是由每個runtime自己實現的。

node-template中的程式碼(/substrate/node-template/runtime/src/lib.rs)如下:

impl block_builder_api::BlockBuilder<Block> for Runtime {      fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult {          Executive::apply_extrinsic(extrinsic)      }  }

其中Executive有打包交易的具體實現,程式碼(substrate/srml/executive/src/lib.rs)如下:

pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult {      let encoded = uxt.encode();      let encoded_len = encoded.len();      Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded))  }

這樣處理完交易,後面就是區塊出塊,以及進行最終性共識。