Rust Async: async-task源碼分析
- 2019 年 11 月 24 日
- 筆記
本文轉載自知乎地址:https://zhuanlan.zhihu.com/p/92679351?utm_source=wechat_session&utm_medium=social&utm_oi=51691969839104
async-std是rust非同步生態中的基礎運行時庫之一,核心理念是合理的性能 + 用戶友好的api體驗。經過幾個月密集的開發,前些天已經發布1.0穩定版本。因此是時候來一次深入的底層源碼分析。async-std的核心是一個帶工作竊取的多執行緒Executor,而其本身的實現又依賴於async-task這個關鍵庫,因此本文主要對async-task的源碼進行分析。
當Future提交給Executor執行時,Executor需要在堆上為這個Future分配空間,同時需要給它分配一些狀態資訊,比如Future是否可以執行(poll),是否在等待被喚醒,是否已經執行完成等等。我們一般把提交給Executor執行的Future和其連帶的狀態稱為 task
。async-task這個庫就是對task進行抽象封裝,以便於Executor的實現,其有幾個創新的特性:
- 整個task只需要一次記憶體分配;
- 完全隱藏了RawWaker,以避免實現Executor時處理unsafe程式碼的麻煩;
- 提供了
JoinHandle
,這樣spawn函數對Future沒有Output=()
的限制,極大方便用戶使用;
使用方式
async-task只對外暴露了一個函數介面以及對應了兩個返回值類型:
pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)where F: Future<Output = R> + Send + 'static, R: Send + 'static, S: Fn(Task<T>) + Send + Sync + 'static, T: Send + Sync + 'static,
其中,參數future表示要執行的Future,schedule是一個閉包,當task變為可執行狀態時會調用這個函數以調度該task重新執行,tag是附帶在該task上的額外上下文資訊,比如task的名字,id等。返回值Task就是構造好的task對象,JoinHandle實現了Future,用於接收最終執行的結果。
值得注意的是spawn這個函數並不會做類似在後台進行計算的操作,而僅僅是分配記憶體,創建一個task出來,因此其實叫create_task反而更為恰當且好理解。
Task提供了如下幾個方法:
// 對該task進行調度 pub fn schedule(self); // poll一次內部的Future,如果Future完成了,則會通知JoinHandle取結果。否則task進 // 入等待,直到被被下一次喚醒進行重新調度執行。 pub fn run(self); // 取消task的執行 pub fn cancel(&self); // 返回創建時傳入的tag資訊 pub fn tag(&self) -> &T;
JoinHandle實現了Future trait,同時也提供了如下幾個方法:
// 取消task的執行 pub fn cancel(&self); // 返回創建時傳入的tag資訊 pub fn tag(&self) -> &T;
同時,Task和JoinHandle都實現了Send+Sync,所以他們可以出現在不同的執行緒,並通過tag方法可以同時持有 &T
,因此spawn函數對T有Sync的約束。
藉助於async_task的抽象,下面的幾十行程式碼就實現了一個共享全局任務隊列的多執行緒Executor:
use std::future::Future; use std::thread; use crossbeam::channel::{unbounded, Sender}; use futures::executor; use once_cell::sync::Lazy; static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| { let (sender, receiver) = unbounded::<async_task::Task<()>>(); for _ in 0..4 { let recv = receiver.clone(); thread::spawn(|| { for task in recv { task.run(); } }); } sender }); fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> where F: Future<Output = R> + Send + 'static, R: Send + 'static, { let schedule = |task| QUEUE.send(task).unwrap(); let (task, handle) = async_task::spawn(future, schedule, ()); task.schedule(); handle } fn main() { let handles: Vec<_> = (0..10).map(|i| { spawn(async move { println!("Hello from task {}", i); }) }).collect(); // Wait for the tasks to finish. for handle in handles { executor::block_on(handle); } }
Task的結構圖
通常rust里的並發數據結構會包含底層的實現,一般叫Inner或者RawXXX,包含大量裸指針等unsafe操作,然後再其基礎上進行類型安全包裝,提供上層語義。比如channel,上層暴露出 Sender
和 Receiver
,其行為不一樣,但內部表示是完全一樣的。async-task也類似,JoinHandle, Task以及調用Future::poll時傳遞的Waker類型內部都共享同一個RawTask結構。由於JoinHandle本身是一個Future,整個並髮結構還有第四個角色-在JoinHandle上調用poll的task傳遞的Waker,為避免引起混淆就稱它為Awaiter吧。整個的結構圖大致如下:

整個task在堆上一次分配,記憶體布局按Header,Tag, Schedule,Future/Output排列。由於Future和Output不同時存在,因此他們共用同一塊記憶體。
- JoinHandle:只有一個,不訪問Future,可以訪問Output,一旦銷毀就不再生成;
- Task:主要訪問Future,銷毀後可以繼續生成,不過同一時間最多只有一個,這樣可以避免潛在的多個Task對Future進行並發訪問的bug;
- Waker:可以存在多份,主要訪問schedule數據,由於spawn函數的參數要求schedule必須是Send+Sync,因此多個waker並發調用是安全的。
- Header:本身包含三個部分,state是一個原子變數,包含引用計數,task的執行狀態,awaiter鎖等資訊;awaiter保存的是JoinHandle所在的task執行時傳遞的Waker,用於當Output生成後通知JoinHandle來取;vtable是一個指向靜態變數的虛表指針。
task中的狀態
所有的並發操作都是通過Header中的state這個原子變數來進行同步協調的。主要有以下幾種flag:
constSCHEDULED:usize=1<<0;
task已經調度準備下一次執行,這個flag可以和RUNGING同時存在。constRUNNING:usize=1<<1;
這個task正在執行中,這個flag可以和SCHEDULED同時存在。constCOMPLETED:usize=1<<2;
這個task的future已經執行完成。constCLOSED:usize=1<<3;
表示這個task要麼被cancel掉了,要麼output被JoinHandle取走了,是一個終結狀態。constHANDLE:usize=1<<4;
表示JoinHandle存在。constAWAITER:usize=1<<5;
表示JoinHandle正在等待Output,用於快速判斷Header里的awaiter不為None,避免獲取鎖的操作。constLOCKED:usize=1<<6;
讀寫Header里的awaiter時,需要設置這個欄位,標識是否處於locked狀態。constREFERENCE:usize=1<<7;
從第7bit開始到最高位當作引用計數用,代表Task和Waker的總數,主要JoinHandle在HANDLE的flag里跟蹤。
JoinHandle的實現分析
JoinHandle::cancel
為避免並發問題,JoinHandle不接觸Future數據,而由於取消task的執行需要析構Future數據,因此cancel操作通過重新schedule一次,把操作傳遞給Task執行。
impl<R, T> JoinHandle<R, T> { pub fn cancel(&self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); loop { // 如果task已經結束或者closed,什麼也不做。 if state & (COMPLETED | CLOSED) != 0 { break; } let new = if state & (SCHEDULED | RUNNING) == 0 { // 如果不處於scheduled或running狀態,那麼下面就需要調用schedule // 函數通知Task,因此要加上SCHEDULED 和增加引用計數 (state | SCHEDULED | CLOSED) + REFERENCE } else { // 否則要麼task已經schedue過了,過段時間會重新執行,要麼當前正在 // 運行,因此只需要設置closed狀態,task執行完後會收到close狀態並 // 進行處理。 state | CLOSED }; match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 重新schedule以便executor將Future銷毀 if state & (SCHEDULED | RUNNING) == 0 { ((*header).vtable.schedule)(ptr); } // 如果有awaiter的話,通知相應的的task。 if state & AWAITER != 0 { (*header).notify(); } break; } Err(s) => state = s,// 失敗重試 } } } } }
JoinHandle::drop
由於整個task的所有權是由JoinHandle,Task和Waker共享的,因此都需要手動實現drop。Output只會由JoinHandle訪問,因此如果有的話也要一同銷毀。
impl<R, T> Drop for JoinHandle<R, T> { fn drop(&mut self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; let mut output = None; unsafe { // 由於很多時候JoinHandle不用,會在剛創建的時候直接drop掉,因此針對這種情 // 況作一個特殊化處理。這樣一個原子操作就完成了。 if let Err(mut state) = (*header).state.compare_exchange_weak( SCHEDULED | HANDLE | REFERENCE, SCHEDULED | REFERENCE, Ordering::AcqRel, Ordering::Acquire, ) { loop { // 如果task完成了,但是還沒有close掉,說明output還沒有被取走,需 // 要在這裡取出來進行析構。 if state & COMPLETED != 0 && state & CLOSED == 0 { // 標記為closed,這樣就可以安全地讀取output的數據。 match (*header).state.compare_exchange_weak( state, state | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { output = Some((((*header).vtable.get_output)(ptr) as *mut R) .read()); // 更新狀態重新循環 state |= CLOSED; } Err(s) => state = s, } } else { // 進到這裡說明task要麼沒完成,要麼已經closed了。 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { // Task和Waker都已經沒了,並且沒closed,根據進else的條 // 件可知task沒完成,Future還在,重新schedule一次,讓 // executor把Future析構掉。 SCHEDULED | CLOSED | REFERENCE } else { // 移除HANDLE flag state & !HANDLE }; match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 如果這是最後一個引用 if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { //並且沒closed,根據進else的條件可知task沒 // 完成,重新schedule一次,析構Future ((*header).vtable.schedule)(ptr); } else { // task已經完成了,output也已經在上面讀出 // 來了,同時也是最後一個引用,需要把task自 // 身析構掉。 ((*header).vtable.destroy)(ptr); } } // 還有其他引用在,資源的釋放由他們負責。 break; } Err(s) => state = s, } } } } } // 析構讀取出來的output drop(output); } }
JoinHandle::poll
檢查Output是否已經可以拿,沒有的話註冊cx.waker()等通知。
impl<R, T> Future for JoinHandle<R, T> { type Output = Option<R>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); loop { // task已經closed了,沒output可拿。 if state & CLOSED != 0 { // 大部分可情況下,header里的awaiter就是cx.waker,也有例外,因 // 此一併進行通知。 (*header).notify_unless(cx.waker()); return Poll::Ready(None); } // 如果task還沒完成 if state & COMPLETED == 0 { // 那麼註冊當前的cx.waker到Header::awaiter里,這樣完成了可以收 // 到通知。 abort_on_panic(|| { (*header).swap_awaiter(Some(cx.waker().clone())); }); // 要是在上面註冊前正好task完成了,那麼就收不到通知了,因此註冊後 // 需要重新讀取下狀態看看。 state = (*header).state.load(Ordering::Acquire); // task已經closed了,沒output可拿,返回None。 if state & CLOSED != 0 { // 這裡我分析下來是不需要再通知了,提了個pr等作者回應。 (*header).notify_unless(cx.waker()); return Poll::Ready(None); } // task還沒完成,上面已經註冊了waker,可以直接返回Pending。 if state & COMPLETED == 0 { return Poll::Pending; } } // 到這裡說明task已經完成了。把它設置為closed狀態,就可以拿output了。 match (*header).state.compare_exchange( state, state | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 設置closed成功,通知其他的awaiter。由於上面是原子的swap操 // 作,且一旦設置為closed,awaiter就不會再變更了,因此可以 // 用AWAITER這個flag進行快速判斷。 if state & AWAITER != 0 { (*header).notify_unless(cx.waker()); } // 讀取出Output並返回。 let output = ((*header).vtable.get_output)(ptr) as *mut R; return Poll::Ready(Some(output.read())); } Err(s) => state = s, } } } } }
Task的實現分析
Task::schedule
這個函數先通過Task內部保存的指針指向Header,並從Header的vtable欄位中拿到schedule函數指針,這個函數最終調用的是用戶調用spawn時傳入的schedule閉包。因此本身很直接。
Task::run
這個函數先通過Task內部保存的指針指向Header,並從Header的vtable欄位中拿到run函數指針,其指向RawTask::run,實現如下:
首先根據指針參數強轉為RawTask,並根據Header的vtable拿到RawWakerVTable,構造好Waker和Context,為調用Future::poll做準備。
unsafe fn run(ptr: *const ()) { let raw = Self::from_ptr(ptr); let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new( ptr, &(*raw.header).vtable.raw_waker, ))); let cx = &mut Context::from_waker(&waker); //... }
然後獲取當前的state,循環直到更新state的RUNING成功為止。
let mut state = (*raw.header).state.load(Ordering::Acquire); loop { // 如果task已經closed,那麼Future可以直接析構掉,並返回。 if state & CLOSED != 0 { if state & AWAITER != 0 { (*raw.header).notify(); } Self::drop_future(ptr); // 扣掉當前task的引用計數,因為run函數的參數是self。 Self::decrement(ptr); return; } // 移除SCHEDULED狀態,並標記RUNING match (*raw.header).state.compare_exchange_weak( state, (state & !SCHEDULED) | RUNNING, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 更新state到新的狀態,後面的程式碼還要復用state。 state = (state & !SCHEDULED) | RUNNING; break; } Err(s) => state = s, } }
標記為RUNING狀態後,就可以開始正式調用Future::poll了,不過在調用前設置Guard,以便poll函數panic時,可以調用Guard的drop函數保證狀態一致。
let guard = Guard(raw); let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); mem::forget(guard); // 沒panic,移除掉guard.drop的調用。 match poll { Poll::Ready(out) => { /// ... } Poll::Pending => { // ... } }
如果Future完成了,那麼先把Future析構掉,騰出記憶體把output寫進去。並循環嘗試將RUNING狀態去掉。
match poll { Poll::Ready(out) => { Self::drop_future(ptr); raw.output.write(out); let mut output = None; loop { // JoinHandle已經沒了,那麼output沒人取,我們需要析構掉output,並設置為 // closed狀態。 let new = if state & HANDLE == 0 { (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED } else { (state & !RUNNING & !SCHEDULED) | COMPLETED }; match (*raw.header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 如果handle沒了,或者跑的時候closed了,那麼需要把output再讀取 // 出來析構掉。 if state & HANDLE == 0 || state & CLOSED != 0 { output = Some(raw.output.read()); } // 通知JoinHandle來取數據。 if state & AWAITER != 0 { (*raw.header).notify(); } Self::decrement(ptr); break; } Err(s) => state = s, } } drop(output); } Poll::Pending => { // ... }
如果沒完成的話,循環嘗試移除RUNING,同時在poll的時候其他執行緒不能調用shedule函數,而是設置SCHEDULED,所以需要檢查這個flag,如果設置了,則需要代勞。
match poll { Poll::Ready(out) => { /// handle ready case ... } Poll::Pending => { loop { // poll的時候closed了,這裡為啥要移除SCHEDULED狀態,暫時不清楚,需要問問 // 作者。 let new = if state & CLOSED != 0 { state & !RUNNING & !SCHEDULED } else { state & !RUNNING }; match (*raw.header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(state) => { if state & CLOSED != 0 { // 設置closed狀態的那個執行緒是不能碰Future的,否則和當前執行緒 // 產生記憶體並發訪問衝突。因此代勞析構操作。 Self::drop_future(ptr); Self::decrement(ptr); } else if state & SCHEDULED != 0 { // poll的時候其他執行緒想schedule這個task,但是不能調用,因此 // 當前執行緒代勞。chedule函數接收self,類似move語義,因此這裡 // 不需要decrement。 Self::schedule(ptr); } else { Self::decrement(ptr); } break; } Err(s) => state = s, } } } }
在poll時如果發生panic,則Guard負責收拾殘局。
fn drop(&mut self) { let raw = self.0; let ptr = raw.header as *const (); unsafe { let mut state = (*raw.header).state.load(Ordering::Acquire); loop { // poll的時候被其他執行緒closed了, if state & CLOSED != 0 { // 看程式碼state一旦處於CLOSED後,schedule不會再運行。這裡為啥要移除 // SCHEDULED狀態,暫時不清楚,需要問問作者。 (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); // 析構Future RawTask::<F, R, S, T>::drop_future(ptr); RawTask::<F, R, S, T>::decrement(ptr); break; } match (*raw.header).state.compare_exchange_weak( state, (state & !RUNNING & !SCHEDULED) | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(state) => { // 析構Future RawTask::<F, R, S, T>::drop_future(ptr); // 通知awaitertask已經close了. if state & AWAITER != 0 { (*raw.header).notify(); } RawTask::<F, R, S, T>::decrement(ptr); break; } Err(s) => state = s, } } } }
Waker相關函數的實現
wake函數
wake函數主要功能是設置SCHEDULE狀態,並嘗試調用schedule函數,有兩個重要的細節需要注意:
- task正在執行時不能調用schedule函數;
- 當task已經被schedule過了時,也需要額外做一次原子操作,施加Release語義。
unsafe fn wake(ptr: *const ()) { let raw = Self::from_ptr(ptr); let mut state = (*raw.header).state.load(Ordering::Acquire); loop { if state & (COMPLETED | CLOSED) != 0 { // 如果task完成或者close了,直接drop掉自己,wake的參數是self語義 Self::decrement(ptr); break; } if state & SCHEDULED != 0 { // 這段程式碼極為關鍵,如果task已經schedule過了,則重新把讀出來的state // 設置回去,雖然看起來好像是無用的,其實是為了施加Release同步語義, // 把當前執行緒的記憶體視圖同步到其他執行緒去。即便是rust標準庫,之前也因為 // 沒處理好類似這個情況出過bug。 match (*raw.header).state.compare_exchange_weak( state, state, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { Self::decrement(ptr); break; } Err(s) => state = s, } } else { // task沒schedule過,則設置狀態。 match (*raw.header).state.compare_exchange_weak( state, state | SCHEDULED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // 如果task當前沒有運行,那麼可以調用schedule函數。 if state & (SCHEDULED | RUNNING) == 0 { // Schedule the task. let task = Task { raw_task: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; (*raw.schedule)(task); } else { // task正在運行,不需要調用schedule,等運行結束後對應的 // 執行緒會代勞。 Self::decrement(ptr); } break; } Err(s) => state = s, } } } }
wake_by_ref
這個函數的功能和wake類似,唯一的區別就是wake的參數是self,有move語義,wakebyref是&self。實現差異不大,就不做具體分析了。
clone_waker
waker的clone實現也比較簡單,直接將Header里的state的引用計數加一即可。
unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let raw = Self::from_ptr(ptr); let raw_waker = &(*raw.header).vtable.raw_waker; let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); if state > isize::max_value() as usize { std::process::abort(); } RawWaker::new(ptr, raw_waker) }
總結
整個task的設計非常精細,api也非常直觀,難怪一發布就直接上1.0版本。