[推薦] 最近學寫 async/await 被 Rust 毒打的經驗

  • 2019 年 11 月 28 日
  • 筆記

不要自作主張地返回 Poll::Pending。

只有在你調用的方法返回了 Poll::Pending 時,才能夠返回 Poll::Pending。 不然,也許程序會通過編譯,但運行結果不會是你想要的。如果你的 Future 只是對現有 Future 的簡單包裝,一般不會犯這種錯誤,但情況複雜了,可能會忘。

我們日常使用的 Future,如 TcpStream/channel/timer 之類,其實是與底層的 reactor 相關聯的,它們在返回 Poll::Pending 的時候,同時會在 reactor 註冊事件,所以才會收到通知,最終 wakeup 相應的 Future。我們自己隨便返回 Poll::Pending 的時候,顯然不會收到通知的。

可能是由於 Rust 選擇的抽象方式的原因,我們一般只關注 Future 本身,忽略與 reactor 的聯繫。其他語言的話,貌似 reactor 或者叫 event loop 才是最核心的概念。

async/await 是高層 primitive,而 Poll 是低層 primitive。高層使用低層,很方便,反過來,比較麻煩,得避免。

有個感覺,async/await 普及開來的話,Poll 只有較底層的庫才關心,日常只會與 async/await 打交道。 (另外,我們就算與 Poll 打交道時,其實幾乎也不關心 Context 或者 Waker,感覺它們更底層。)

await 與 tokio::sync::Mutex 一起使用,需要注意死鎖問題。

從某種意義上說 await 也是有「鎖」的語義,至少都需要「等待」。只是 Mutex 等待別人 unlock,而 await 在等待事件發生。(tokio::sync::Mutext 的 lock() 方法返回的就是 Future,需要你 await 的。)

舉個例子,lock 之後,然後 await。如果被 await 的 Future,也需要 lock,然後才能返回 Poll::Ready。這時實際上就死鎖了,兩個 Future 會無限等待下去。

所以呢,不要認為,我反正只用了一把鎖,所以死鎖是不可能的。(推廣一下,即使不用鎖,但相互 await 的關係搞亂了,同樣可能死鎖的)。

有個感覺,在持有鎖時去 await 可能就是代碼有問題的一個標誌,bad smell。即使不死鎖,await 的事件常常是 IO 之類,耗時很長或不可控,相當於鎖的臨界區太大了。搞不好即使沒有死鎖,你的程序也從 concurrent 的變成 stop-and-wait 的了。

「Future 組合」的問題。

這裡不僅僅指 Future,還有 Stream/Sink,以及 tokio 提供的 AsyncRead/AsyncWrite 等 trait,它們共同點是,都會返回 Poll。

情況可能是這樣,你有一個 struct XXX,它擁有多個 Future/Stream/Sink/AsyncRead/AsyncWrite 對象,你需要把它們組合在一起,並且需要把 struct XXX 實現為一個 Future,然後交給 tokio::spawn() 來運行。 這裡一般兩種辦法,一是手動實現 Future,也就是 impl Future for XXX {} 然後實現 poll 方法;另一種是為 struct XXX 定義一個 async fn foo(),然後 tokio::spawn(xxx.foo())。

對於手動 impl Future for XXX {}。相當於選擇了底層 primitive 這條路。由於 Future::poll 不是 async 方法,當然你不能使用 await,只能用 poll/poll_next/poll_ready/start_send/poll_flush/poll_close/poll_read/poll_write/poll_shutdown 這些方法了,它們是 Future/Stream/Sink/AsyncRead/AsyncWrite 的方法。 這裡需要注意的點,除了上面提到的「永遠不要自作主張去返回 Poll::Pending」外,還需要知道,一個 Future 在 poll() 返回 Poll::Pending 後,以後如果對應事件發生,會再次 wakeup 這個 Future,從而再次調用 poll() 方法。需要注意,你的 poll() 方法用到的局部變量,可能需要存下來,作為你的 struct XXX 的字段。打個比方,你的 struc XXX 需要實現的是類 proxy 的功能,它從一個 Stream 取數據,然後發到另一個 Sink。首先要調用 Stream::poll_next() 取數據,取到之後,Sink 的發送是分三步的,依次是 poll_ready/start_send/poll_flush,其中 poll_ready 和 poll_flush 返回的類型是 Poll。取完數據,然後調用 poll_ready,只有返回 Poll::Ready,才能繼續前進。如果返回 Poll::Pending,當然不能直接返回,只能先把取到的數據存下來,基本上意味着得給 struct XXX 加個 Option 字段,調用 poll() 時,需要先檢查 Option 是不是 Some(T)。反正呢,手動 impl Future,得把 poll() 方法寫成無狀態的,局部變量需要跨越多次 poll() 調用而存活的話,得存在外部(比如作為 struct 的字段)。這一點跟 web 應用挺像的,應用是無狀態的,所有信息都在外部的存儲裏面。另外,為什麼 async/await 為什麼很受期待,因為編譯器會把這些狀態自動給你放到生成的 Generator struct 裏面,寫代碼時直接用局部變量就行,不需要給 struct 定義這些額外的字段。async/await 另一好處,如果你自己實現的 poll() 比較複雜,調用多個 poll()/poll_xxx() 系函數的話,因為有多個可能返回 Poll::Pending 的地方,當等待的事件發生,那些地方從 Pending 變成 Ready 時,你的 poll() 函數會被調用多次,實際上可能會有重複的代碼執行。比如,如果你的 poll() 要調用 Stream 的 poll_ready/start_send/poll_flush 方法,實現的時候你可能直接就依次調用了,那麼第一次進入你的 poll() 時, poll_ready() 會返回 Poll::Pending,第二次進入你的 poll() 時,poll_flush() 會返回 Poll::Pending(注意 start_send() 返回的是 Result 不是 Poll)。但這樣的話,其實 poll_ready() 其實被額外調用了一次。當然你可以自己定義一些狀態來避免,但代碼就沒那麼直觀了。但是呢,如果用 await,編譯器貌似生成代碼的時候,就會根據 await 點生成多個狀態,每次檢查狀態來決定怎麼調用,不會造成額外的函數調用的,這也體現了 zero-cost abstraction 了。

上面還提到使用底層 primitive(poll) 時,避免使用高層 primitive(async/await)。但也許被 rustc 虐得不行了,難保會有些詭異的想法,又或者 struct XXX 剛好有個字段,它有個你很想用的方法,只是它是 async 的。畢竟, async func() -> T 函數實際的類型,其實是 impl Future<Output=T> 的不知名類型。你這麼來一下, let a = func(); a 就是一個 Future,你就可以調用 poll() 了。這樣又能回到手動 impl Future 這條路了。這看起來挺不錯,不過這裡的局部變量 a 作為一個臨時狀態,其實是需要作為 struct XXX 的字段存起來的,痛苦的點在於給字段聲明什麼類型。具體類型是不知道的,只能是 Box<dyn Future<Output=xxx>>,但接下來類型系統或者 borrow checker 就會發飆,也許會報錯說不滿足 'static, Send 或者 UnPin,又或生命周期有問題,或者借用有問題,或者 Pin 抱住你腿不讓走,各種亂七八糟的都出來了,直面 Rust 黑暗內幕挺痛苦的。。。

對於為 struct XXX 定義一個 async fn foo(),然後 tokio::spawn(xxx.foo()) 這種情況。如果在 async fn foo() 裏面調用其他 async 函數時都是順序的,那一切都挺簡單,反正就 await 就可以了。而如果要協調 async 函數之間的行為,可能會遇到問題。比如,使用 select! 宏的場景,你的 struct 需要處理兩個數據源,一個是 channel,一個是 TcpStream,誰收到數據就處理誰,不能一個阻塞另一個。可能一開始本能就寫了兩個 async 方法,handle_channel 和 handle_stream,然後放到 select! 處理,結果就杯具了,因為這兩個方法都需要 &mut self,rustc 編譯不過。我一開始的解法是,把這兩個方法都改成 struct 的靜態方法(不使用任何 self 系參數),以前需要用到的字段,都改為參數傳進去;需要共享的字段,就傳 Arc<Mutex>,總算編譯通過了。結果一運行就發現不對,排查下來,原因就是上面的 await + Mutex 導致的死鎖,真是欲哭無淚。折騰很久,最後的辦法是,完全不使用 struct 直接三個 async 函數,最外層的 async 充當了 struct 的作用,它調的兩個 async 函數職責變化不大。當然不這麼做,而是仍然在原來基礎上改其實也是可以的。但最關鍵的點在於,需要把 TcpStream,split 成 ReadHalf 和 WriteHalf,這樣你的借用會簡單很多。前面死鎖,就是因為沒有 split,而是直接 Arc<Mutex> 傳過去,才引發的。有個感覺,實際中可能經常需要把 TcpStream split() 成 ReadHalf/WriteHalf,這樣的話用起來其實和 channel 有點像。拆分之後能簡化借用關係,但 ReadHalf/WriteHalf 持有的是 TcpStream 的引用,也許生命周期方面會有些問題,但還沒遇到過。不過這裡提到的問題可能與 async/await 關係不大,主要還是怎樣與 borrow checker 和諧相處的問題。&self 和 &mut self 是粒度比較粗的借用。有時候方法裏面只用到了對象的幾個字段而已,但 &self 和 &mut self 卻借走了整個對象,算是「借用擴大化」,多個方法一起被調用時,就會有干擾。相反如果極端一點,完全不用 struct,struct 的字段在代碼里以單獨的變量存在,那麼就可以「用多少借多少」,最大程度上避免了「借用衝突」,我的解法算是這種極端的一種嘗試。應該說完全不用 struct 也是做不到的。但什麼時候分,什麼時候合,這個還挺值得注意的。Rust 的 struct 不像其他語言的 class,不能隨便啥都往裏面扔,否則會被 rustc 教做人。設計 struct 的最佳實踐是什麼,還沒看到好的總結,不過呢提前知道坑可能在哪裡,等你掉進去時就不會那麼震驚了。知乎有篇文章講到了這個問題,感覺講得挺好的: https://zhuanlan.zhihu.com/p/26393679