Rust Async: async-task源碼分析

  • 2019 年 11 月 24 日
  • 筆記

本文轉載自知乎地址:https://zhuanlan.zhihu.com/p/92679351?utm_source=wechat_session&utm_medium=social&utm_oi=51691969839104

async-std是rust非同步生態中的基礎運行時庫之一,核心理念是合理的性能 + 用戶友好的api體驗。經過幾個月密集的開發,前些天已經發布1.0穩定版本。因此是時候來一次深入的底層源碼分析。async-std的核心是一個帶工作竊取的多執行緒Executor,而其本身的實現又依賴於async-task這個關鍵庫,因此本文主要對async-task的源碼進行分析。

當Future提交給Executor執行時,Executor需要在堆上為這個Future分配空間,同時需要給它分配一些狀態資訊,比如Future是否可以執行(poll),是否在等待被喚醒,是否已經執行完成等等。我們一般把提交給Executor執行的Future和其連帶的狀態稱為 task。async-task這個庫就是對task進行抽象封裝,以便於Executor的實現,其有幾個創新的特性:

  1. 整個task只需要一次記憶體分配;
  2. 完全隱藏了RawWaker,以避免實現Executor時處理unsafe程式碼的麻煩;
  3. 提供了 JoinHandle,這樣spawn函數對Future沒有 Output=()的限制,極大方便用戶使用;

使用方式

async-task只對外暴露了一個函數介面以及對應了兩個返回值類型:

pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)where    F: Future<Output = R> + Send + 'static,    R: Send + 'static,    S: Fn(Task<T>) + Send + Sync + 'static,    T: Send + Sync + 'static,

其中,參數future表示要執行的Future,schedule是一個閉包,當task變為可執行狀態時會調用這個函數以調度該task重新執行,tag是附帶在該task上的額外上下文資訊,比如task的名字,id等。返回值Task就是構造好的task對象,JoinHandle實現了Future,用於接收最終執行的結果。

值得注意的是spawn這個函數並不會做類似在後台進行計算的操作,而僅僅是分配記憶體,創建一個task出來,因此其實叫create_task反而更為恰當且好理解。

Task提供了如下幾個方法:

    // 對該task進行調度      pub fn schedule(self);      // poll一次內部的Future,如果Future完成了,則會通知JoinHandle取結果。否則task進      // 入等待,直到被被下一次喚醒進行重新調度執行。      pub fn run(self);      // 取消task的執行      pub fn cancel(&self);      // 返回創建時傳入的tag資訊      pub fn tag(&self) -> &T;

JoinHandle實現了Future trait,同時也提供了如下幾個方法:

    // 取消task的執行      pub fn cancel(&self);      // 返回創建時傳入的tag資訊      pub fn tag(&self) -> &T;

同時,Task和JoinHandle都實現了Send+Sync,所以他們可以出現在不同的執行緒,並通過tag方法可以同時持有 &T,因此spawn函數對T有Sync的約束。

藉助於async_task的抽象,下面的幾十行程式碼就實現了一個共享全局任務隊列的多執行緒Executor:

use std::future::Future;  use std::thread;    use crossbeam::channel::{unbounded, Sender};  use futures::executor;  use once_cell::sync::Lazy;    static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {      let (sender, receiver) = unbounded::<async_task::Task<()>>();      for _ in 0..4 {          let recv = receiver.clone();            thread::spawn(|| {              for task in recv {                  task.run();              }          });      }        sender  });    fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>  where      F: Future<Output = R> + Send + 'static,      R: Send + 'static,  {      let schedule = |task| QUEUE.send(task).unwrap();      let (task, handle) = async_task::spawn(future, schedule, ());        task.schedule();        handle  }    fn main() {      let handles: Vec<_> = (0..10).map(|i| {          spawn(async move {              println!("Hello from task {}", i);          })      }).collect();        // Wait for the tasks to finish.      for handle in handles {          executor::block_on(handle);      }  }

Task的結構圖

通常rust里的並發數據結構會包含底層的實現,一般叫Inner或者RawXXX,包含大量裸指針等unsafe操作,然後再其基礎上進行類型安全包裝,提供上層語義。比如channel,上層暴露出 SenderReceiver,其行為不一樣,但內部表示是完全一樣的。async-task也類似,JoinHandle, Task以及調用Future::poll時傳遞的Waker類型內部都共享同一個RawTask結構。由於JoinHandle本身是一個Future,整個並髮結構還有第四個角色-在JoinHandle上調用poll的task傳遞的Waker,為避免引起混淆就稱它為Awaiter吧。整個的結構圖大致如下:

整個task在堆上一次分配,記憶體布局按Header,Tag, Schedule,Future/Output排列。由於Future和Output不同時存在,因此他們共用同一塊記憶體。

  • JoinHandle:只有一個,不訪問Future,可以訪問Output,一旦銷毀就不再生成;
  • Task:主要訪問Future,銷毀後可以繼續生成,不過同一時間最多只有一個,這樣可以避免潛在的多個Task對Future進行並發訪問的bug;
  • Waker:可以存在多份,主要訪問schedule數據,由於spawn函數的參數要求schedule必須是Send+Sync,因此多個waker並發調用是安全的。
  • Header:本身包含三個部分,state是一個原子變數,包含引用計數,task的執行狀態,awaiter鎖等資訊;awaiter保存的是JoinHandle所在的task執行時傳遞的Waker,用於當Output生成後通知JoinHandle來取;vtable是一個指向靜態變數的虛表指針。

task中的狀態

所有的並發操作都是通過Header中的state這個原子變數來進行同步協調的。主要有以下幾種flag:

  1. constSCHEDULED:usize=1<<0; task已經調度準備下一次執行,這個flag可以和RUNGING同時存在。
  2. constRUNNING:usize=1<<1; 這個task正在執行中,這個flag可以和SCHEDULED同時存在。
  3. constCOMPLETED:usize=1<<2; 這個task的future已經執行完成。
  4. constCLOSED:usize=1<<3; 表示這個task要麼被cancel掉了,要麼output被JoinHandle取走了,是一個終結狀態。
  5. constHANDLE:usize=1<<4; 表示JoinHandle存在。
  6. constAWAITER:usize=1<<5; 表示JoinHandle正在等待Output,用於快速判斷Header里的awaiter不為None,避免獲取鎖的操作。
  7. constLOCKED:usize=1<<6; 讀寫Header里的awaiter時,需要設置這個欄位,標識是否處於locked狀態。
  8. constREFERENCE:usize=1<<7; 從第7bit開始到最高位當作引用計數用,代表Task和Waker的總數,主要JoinHandle在HANDLE的flag里跟蹤。

JoinHandle的實現分析

JoinHandle::cancel

為避免並發問題,JoinHandle不接觸Future數據,而由於取消task的執行需要析構Future數據,因此cancel操作通過重新schedule一次,把操作傳遞給Task執行。

impl<R, T> JoinHandle<R, T> {      pub fn cancel(&self) {          let ptr = self.raw_task.as_ptr();          let header = ptr as *const Header;            unsafe {              let mut state = (*header).state.load(Ordering::Acquire);                loop {                  // 如果task已經結束或者closed,什麼也不做。                  if state & (COMPLETED | CLOSED) != 0 {                      break;                  }                    let new = if state & (SCHEDULED | RUNNING) == 0 {                      // 如果不處於scheduled或running狀態,那麼下面就需要調用schedule                      // 函數通知Task,因此要加上SCHEDULED 和增加引用計數                      (state | SCHEDULED | CLOSED) + REFERENCE                  } else {                      // 否則要麼task已經schedue過了,過段時間會重新執行,要麼當前正在                      // 運行,因此只需要設置closed狀態,task執行完後會收到close狀態並                      // 進行處理。                      state | CLOSED                  };                    match (*header).state.compare_exchange_weak(                      state,                      new,                      Ordering::AcqRel,                      Ordering::Acquire,                  ) {                      Ok(_) => {                          // 重新schedule以便executor將Future銷毀                          if state & (SCHEDULED | RUNNING) == 0 {                              ((*header).vtable.schedule)(ptr);                          }                            // 如果有awaiter的話,通知相應的的task。                          if state & AWAITER != 0 {                              (*header).notify();                          }                            break;                      }                      Err(s) => state = s,// 失敗重試                  }              }          }      }  }

JoinHandle::drop

由於整個task的所有權是由JoinHandle,Task和Waker共享的,因此都需要手動實現drop。Output只會由JoinHandle訪問,因此如果有的話也要一同銷毀。

impl<R, T> Drop for JoinHandle<R, T> {      fn drop(&mut self) {          let ptr = self.raw_task.as_ptr();          let header = ptr as *const Header;            let mut output = None;            unsafe {              // 由於很多時候JoinHandle不用,會在剛創建的時候直接drop掉,因此針對這種情              // 況作一個特殊化處理。這樣一個原子操作就完成了。              if let Err(mut state) = (*header).state.compare_exchange_weak(                  SCHEDULED | HANDLE | REFERENCE,                  SCHEDULED | REFERENCE,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  loop {                      // 如果task完成了,但是還沒有close掉,說明output還沒有被取走,需                      // 要在這裡取出來進行析構。                      if state & COMPLETED != 0 && state & CLOSED == 0 {                          // 標記為closed,這樣就可以安全地讀取output的數據。                          match (*header).state.compare_exchange_weak(                              state,                              state | CLOSED,                              Ordering::AcqRel,                              Ordering::Acquire,                          ) {                              Ok(_) => {                                  output =                                      Some((((*header).vtable.get_output)(ptr) as *mut R)                                      .read());                                    // 更新狀態重新循環                                  state |= CLOSED;                              }                              Err(s) => state = s,                          }                      } else {                          // 進到這裡說明task要麼沒完成,要麼已經closed了。                          let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {                              // Task和Waker都已經沒了,並且沒closed,根據進else的條                              // 件可知task沒完成,Future還在,重新schedule一次,讓                              // executor把Future析構掉。                              SCHEDULED | CLOSED | REFERENCE                          } else {                              // 移除HANDLE flag                              state & !HANDLE                          };                            match (*header).state.compare_exchange_weak(                              state,                              new,                              Ordering::AcqRel,                              Ordering::Acquire,                          ) {                              Ok(_) => {                                  // 如果這是最後一個引用                                  if state & !(REFERENCE - 1) == 0 {                                      if state & CLOSED == 0 {                                          //並且沒closed,根據進else的條件可知task沒                                          // 完成,重新schedule一次,析構Future                                          ((*header).vtable.schedule)(ptr);                                      } else {                                          // task已經完成了,output也已經在上面讀出                                          // 來了,同時也是最後一個引用,需要把task自                                          // 身析構掉。                                          ((*header).vtable.destroy)(ptr);                                      }                                  }                                    // 還有其他引用在,資源的釋放由他們負責。                                  break;                              }                              Err(s) => state = s,                          }                      }                  }              }          }            // 析構讀取出來的output          drop(output);      }  }

JoinHandle::poll

檢查Output是否已經可以拿,沒有的話註冊cx.waker()等通知。

impl<R, T> Future for JoinHandle<R, T> {      type Output = Option<R>;        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {          let ptr = self.raw_task.as_ptr();          let header = ptr as *const Header;            unsafe {              let mut state = (*header).state.load(Ordering::Acquire);                loop {                  // task已經closed了,沒output可拿。                  if state & CLOSED != 0 {                      // 大部分可情況下,header里的awaiter就是cx.waker,也有例外,因                      // 此一併進行通知。                      (*header).notify_unless(cx.waker());                      return Poll::Ready(None);                  }                    // 如果task還沒完成                  if state & COMPLETED == 0 {                      // 那麼註冊當前的cx.waker到Header::awaiter里,這樣完成了可以收                      // 到通知。                      abort_on_panic(|| {                          (*header).swap_awaiter(Some(cx.waker().clone()));                      });                        // 要是在上面註冊前正好task完成了,那麼就收不到通知了,因此註冊後                      // 需要重新讀取下狀態看看。                      state = (*header).state.load(Ordering::Acquire);                        // task已經closed了,沒output可拿,返回None。                      if state & CLOSED != 0 {                          // 這裡我分析下來是不需要再通知了,提了個pr等作者回應。                          (*header).notify_unless(cx.waker());                          return Poll::Ready(None);                      }                        // task還沒完成,上面已經註冊了waker,可以直接返回Pending。                      if state & COMPLETED == 0 {                          return Poll::Pending;                      }                  }                    // 到這裡說明task已經完成了。把它設置為closed狀態,就可以拿output了。                  match (*header).state.compare_exchange(                      state,                      state | CLOSED,                      Ordering::AcqRel,                      Ordering::Acquire,                  ) {                      Ok(_) => {                          // 設置closed成功,通知其他的awaiter。由於上面是原子的swap操                          // 作,且一旦設置為closed,awaiter就不會再變更了,因此可以                          // 用AWAITER這個flag進行快速判斷。                          if state & AWAITER != 0 {                              (*header).notify_unless(cx.waker());                          }                            // 讀取出Output並返回。                          let output = ((*header).vtable.get_output)(ptr) as *mut R;                          return Poll::Ready(Some(output.read()));                      }                      Err(s) => state = s,                  }              }          }      }  }

Task的實現分析

Task::schedule

這個函數先通過Task內部保存的指針指向Header,並從Header的vtable欄位中拿到schedule函數指針,這個函數最終調用的是用戶調用spawn時傳入的schedule閉包。因此本身很直接。

Task::run

這個函數先通過Task內部保存的指針指向Header,並從Header的vtable欄位中拿到run函數指針,其指向RawTask::run,實現如下:

首先根據指針參數強轉為RawTask,並根據Header的vtable拿到RawWakerVTable,構造好Waker和Context,為調用Future::poll做準備。

unsafe fn run(ptr: *const ()) {      let raw = Self::from_ptr(ptr);        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(          ptr,          &(*raw.header).vtable.raw_waker,      )));      let cx = &mut Context::from_waker(&waker);        //...  }

然後獲取當前的state,循環直到更新state的RUNING成功為止。

    let mut state = (*raw.header).state.load(Ordering::Acquire);      loop {          // 如果task已經closed,那麼Future可以直接析構掉,並返回。          if state & CLOSED != 0 {              if state & AWAITER != 0 {                  (*raw.header).notify();              }                Self::drop_future(ptr);                // 扣掉當前task的引用計數,因為run函數的參數是self。              Self::decrement(ptr);              return;          }            // 移除SCHEDULED狀態,並標記RUNING          match (*raw.header).state.compare_exchange_weak(              state,              (state & !SCHEDULED) | RUNNING,              Ordering::AcqRel,              Ordering::Acquire,          ) {              Ok(_) => {              	// 更新state到新的狀態,後面的程式碼還要復用state。                  state = (state & !SCHEDULED) | RUNNING;                  break;              }              Err(s) => state = s,          }      }

標記為RUNING狀態後,就可以開始正式調用Future::poll了,不過在調用前設置Guard,以便poll函數panic時,可以調用Guard的drop函數保證狀態一致。

    let guard = Guard(raw);      let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);      mem::forget(guard); // 沒panic,移除掉guard.drop的調用。        match poll {          Poll::Ready(out) => {          	/// ...          }          Poll::Pending => {              // ...          }      }

如果Future完成了,那麼先把Future析構掉,騰出記憶體把output寫進去。並循環嘗試將RUNING狀態去掉。

match poll {      Poll::Ready(out) => {          Self::drop_future(ptr);          raw.output.write(out);            let mut output = None;            loop {              // JoinHandle已經沒了,那麼output沒人取,我們需要析構掉output,並設置為              // closed狀態。              let new = if state & HANDLE == 0 {                  (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED              } else {                  (state & !RUNNING & !SCHEDULED) | COMPLETED              };                match (*raw.header).state.compare_exchange_weak(                  state,                  new,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  Ok(_) => {                      // 如果handle沒了,或者跑的時候closed了,那麼需要把output再讀取                      // 出來析構掉。                      if state & HANDLE == 0 || state & CLOSED != 0 {                          output = Some(raw.output.read());                      }                        // 通知JoinHandle來取數據。                      if state & AWAITER != 0 {                          (*raw.header).notify();                      }                        Self::decrement(ptr);                      break;                  }                  Err(s) => state = s,              }          }          drop(output);      }      Poll::Pending => {      	// ...      }

如果沒完成的話,循環嘗試移除RUNING,同時在poll的時候其他執行緒不能調用shedule函數,而是設置SCHEDULED,所以需要檢查這個flag,如果設置了,則需要代勞。

match poll {      Poll::Ready(out) => {      	/// handle ready case ...      }      Poll::Pending => {          loop {              // poll的時候closed了,這裡為啥要移除SCHEDULED狀態,暫時不清楚,需要問問              // 作者。              let new = if state & CLOSED != 0 {                  state & !RUNNING & !SCHEDULED              } else {                  state & !RUNNING              };                match (*raw.header).state.compare_exchange_weak(                  state,                  new,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  Ok(state) => {                      if state & CLOSED != 0 {                          // 設置closed狀態的那個執行緒是不能碰Future的,否則和當前執行緒                          // 產生記憶體並發訪問衝突。因此代勞析構操作。                          Self::drop_future(ptr);                            Self::decrement(ptr);                      } else if state & SCHEDULED != 0 {                          // poll的時候其他執行緒想schedule這個task,但是不能調用,因此                          // 當前執行緒代勞。chedule函數接收self,類似move語義,因此這裡                          // 不需要decrement。                          Self::schedule(ptr);                      } else {                          Self::decrement(ptr);                      }                      break;                  }                  Err(s) => state = s,              }          }      }  }

在poll時如果發生panic,則Guard負責收拾殘局。

fn drop(&mut self) {      let raw = self.0;      let ptr = raw.header as *const ();        unsafe {          let mut state = (*raw.header).state.load(Ordering::Acquire);            loop {              // poll的時候被其他執行緒closed了,              if state & CLOSED != 0 {                  // 看程式碼state一旦處於CLOSED後,schedule不會再運行。這裡為啥要移除                  // SCHEDULED狀態,暫時不清楚,需要問問作者。                  (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);                    // 析構Future                  RawTask::<F, R, S, T>::drop_future(ptr);                  RawTask::<F, R, S, T>::decrement(ptr);                  break;              }                match (*raw.header).state.compare_exchange_weak(                  state,                  (state & !RUNNING & !SCHEDULED) | CLOSED,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  Ok(state) => {                      // 析構Future                      RawTask::<F, R, S, T>::drop_future(ptr);                        // 通知awaitertask已經close了.                      if state & AWAITER != 0 {                          (*raw.header).notify();                      }                        RawTask::<F, R, S, T>::decrement(ptr);                      break;                  }                  Err(s) => state = s,              }          }      }  }

Waker相關函數的實現

wake函數

wake函數主要功能是設置SCHEDULE狀態,並嘗試調用schedule函數,有兩個重要的細節需要注意:

  1. task正在執行時不能調用schedule函數;
  2. 當task已經被schedule過了時,也需要額外做一次原子操作,施加Release語義。
unsafe fn wake(ptr: *const ()) {      let raw = Self::from_ptr(ptr);        let mut state = (*raw.header).state.load(Ordering::Acquire);        loop {          if state & (COMPLETED | CLOSED) != 0 {              // 如果task完成或者close了,直接drop掉自己,wake的參數是self語義              Self::decrement(ptr);              break;          }            if state & SCHEDULED != 0 {              // 這段程式碼極為關鍵,如果task已經schedule過了,則重新把讀出來的state              // 設置回去,雖然看起來好像是無用的,其實是為了施加Release同步語義,              // 把當前執行緒的記憶體視圖同步到其他執行緒去。即便是rust標準庫,之前也因為              // 沒處理好類似這個情況出過bug。              match (*raw.header).state.compare_exchange_weak(                  state,                  state,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  Ok(_) => {                      Self::decrement(ptr);                      break;                  }                  Err(s) => state = s,              }          } else {              // task沒schedule過,則設置狀態。              match (*raw.header).state.compare_exchange_weak(                  state,                  state | SCHEDULED,                  Ordering::AcqRel,                  Ordering::Acquire,              ) {                  Ok(_) => {                      // 如果task當前沒有運行,那麼可以調用schedule函數。                      if state & (SCHEDULED | RUNNING) == 0 {                          // Schedule the task.                          let task = Task {                              raw_task: NonNull::new_unchecked(ptr as *mut ()),                              _marker: PhantomData,                          };                          (*raw.schedule)(task);                      } else {                          // task正在運行,不需要調用schedule,等運行結束後對應的                          // 執行緒會代勞。                          Self::decrement(ptr);                      }                        break;                  }                  Err(s) => state = s,              }          }      }  }

wake_by_ref

這個函數的功能和wake類似,唯一的區別就是wake的參數是self,有move語義,wakebyref是&self。實現差異不大,就不做具體分析了。

clone_waker

waker的clone實現也比較簡單,直接將Header里的state的引用計數加一即可。

unsafe fn clone_waker(ptr: *const ()) -> RawWaker {      let raw = Self::from_ptr(ptr);      let raw_waker = &(*raw.header).vtable.raw_waker;        let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);        if state > isize::max_value() as usize {          std::process::abort();      }        RawWaker::new(ptr, raw_waker)  }

總結

整個task的設計非常精細,api也非常直觀,難怪一發布就直接上1.0版本。