Rust非同步淺談
- 2020 年 3 月 12 日
- 筆記
前提
這篇文章主要描述了Rust中非同步的原理,Rust非同步也是在最近的版本中(1.39)中才穩定下來。希望可以通過這邊文章在提高自己認知的情況下,也可以給讀者帶來一些解惑。(來自於本人被Rust非同步毒打的一些經驗之談).
閱讀這篇文章需要對作業系統,IO多路復用,以及一些數據結構有一定的概念。
Future
Future
字面的意思就是未來發生的事情,在程式中則代表了一系列暫時沒有結果的運運算元,Future
需要程式主動去poll
(輪詢)才能獲取到最終的結果,每一次輪詢的結果可能是Ready
或者Pending
。
當Ready
的時候,證明當前Future
已完成,程式碼邏輯可以向下執行;當Pending
的時候,代表當前Future
並未執行完成,程式碼不能向下執行,看到這裡就要問了,那什麼時候才能向下執行呢,這裡的關鍵在於Runtime
中的Executor
需要不停的去執行Future
的poll
操作,直至Future
返回Ready
可以向下執行為止。等等,熟悉Linux
的同學可能要說了,怎麼感覺和Epoll
模型是非常的相似呢,沒錯,這確實非常相像(但是依然有些許不同,Future
可以避免空的輪詢),看樣子優秀的設計在哪裡都可以看到類似的身影。為了實現Rust聲稱的高性能與零開銷抽象,這裡做了一些優化,下面一一講述。
Future結構
pub enum Poll<T> { Ready(T), Pending, } pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; }
Future
的定義非常簡單,Output
代表了Future
返回的值的類型,而poll
方法是執行Future
的關鍵,poll
方法可以返回一個Poll
類型,Poll
類型是一個Enum
,包裝了Ready
和Pending
兩種狀態。
Runtime
Runtime
由兩部分組成,Executor
和Reactor
。
Executor
為執行器,沒有任何阻塞的等待,循環執行一系列就緒的Future
,當Future
返回pending
的時候,會將Future
轉移到Reactor
上等待進一步的喚醒。
Reactor
為反應器(喚醒器),輪詢並喚醒掛在的事件,並執行對應的wake
方法,通常來說,wake
會將Future
的狀態變更為就緒,同時將Future
放到Executor
的隊列中等待執行。
執行流程
下面的序列圖大概簡單的描繪了Future
在Executor
和Reactor
之間來迴轉移的流程與狀態變化。
sequenceDiagram participant Executor participant Reactor activate Executor Executor->>Reactor: Pending Future deactivate Executor Note left of Executor: Execute other Future activate Reactor Reactor->>Executor: Ready Future deactivate Reactor activate Executor deactivate Executor

上面說明了一個簡單的Future
的執行,如果是一個比較複雜的Future
的話,比如中間會有多次IO
操作的話,那麼流程時怎麼樣的呢?看下面一段程式碼:(僅僅作為demo,不代表可以直接使用)
async fn read_and_write(s: TcpStream) { let (mut r, mut w) = s.split(); let mut buffer = r.read().await.unwrap(); buffer.append("Hello,world"); w.write_all(buffer.as_bytes()).await.unwrap(); }
對應的執行流程為:
sequenceDiagram participant Executor participant Reactor activate Executor deactivate Executor Executor->>Reactor: Pending on r.read() Note left of Executor: Execute other Future activate Reactor Reactor->>Executor: r.read() is ready Note left of Executor: Execute current Future deactivate Reactor Executor->>Reactor: Pending on w.write_all() Note left of Executor: Execute other Future activate Reactor deactivate Reactor Reactor->>Executor: w.write_all() is ready

上面的這些例子系統中只展示了一個
Future
的執行情況,真實的生產環境中,可能有數十萬的Future
同時在執行,Executor
和Reactor
的調度模型要更複雜一些。
總結
一句話概括Runtime
,Future
不能馬上返回值的時候,會被交給Reactor
,Future
的值準備就緒後,調用wake
傳遞給Executor
執行,反覆執行,直至整個Future
返回Ready
。
Executor
通常來說,Executor
的實現可以是單執行緒與執行緒池兩個版本,兩種實現間各有優劣,單執行緒少了數據的競爭,但是吞吐量卻容易達到瓶頸,執行緒池的實現可以提高吞吐量,但是卻要處理數據的競爭衝突。下面我們以async-std
來分析基於執行緒池的實現:
fn main_loop() { loop { match find_runnable() { Some(task) => task.run(); None => { // 實際上,這裡根據空循環的次數,會陷入睡眠狀態或出讓CPU資源,直到新的task來喚醒。 } } } } fn find_runnable() -> Option<Task> { let task = get_local(); if task.is_some() { return task; } let task = get_local(); if task.is_some() { return task; } steal_other() }
這裡做了大量的簡化,整個Executor是一個執行緒池,每個執行緒都在不斷的尋找可執行的task,然後執行,然後再找下一個task,再執行,永遠重複。
從上面的main_loop中可以看到,cpu並不是一直毫無意義的空轉,中間會有一些策略來優化cpu的使用。
Reactor
Reactor
作為反應器,上面同時掛在了成千上萬個待喚醒的事件, 這裡使用了mio
統一封裝了作業系統的多路復用API
。在Linux
中使用的是Epoll
,在Mac
中使用的則是Kqueue
,具體的實現在此不多說。
在Future的基礎上,出現了AsyncRead/AsyncWrite/AsyncSeek
等抽象來描述IO操作,在執行對應的Read/Write/Seek
操作時,如果底層的數據尚未準備好,會把所在的Future註冊至Reactor。Reactor的流程如下:
loop { poll.poll(&events, timeout); for event in events.iter() { if (event.is_readable()) { for waker in event.readers.wakers { waker.wake(); } } if (event.is_writeable()) { for waker in event.writers.wakers { waker.wake(); } } } }
Reactor
會不斷的poll
就緒的事件,然後依次喚醒綁定在事件上的waker
,waker
喚醒的時候會把對應的task
移動到Executor
的就緒隊列上安排執行。
結合
Executor
的運作原理不難發現,Executor
肯定不會poll
到未就緒的task
,因為只有就緒的任務才會被放到Executor
的執行隊列中,Executor
的資源利用率再一次被提高,這就是整個非同步體系的高明之處。
Stream
Future
是非同步開發中最基礎的概念了,如果說Future
代表了一次性的非同步的值,那麼Stream
則代表了一系列的非同步的值。Future
是1,Stream
是0,1或者N。簽名如下:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Stream
對應了同步原語中的Iterator
的概念,想一下,是不是連簽名都是如此的相像呢。
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream
用來抽象源源不斷的數據源,當然也可以斷。可以用來抽象Websocket Connection
,在Websokcet
中,服務源源不斷的接受客戶端的值並處理,直至客戶端斷開連接。更進一步的抽象,MQ
中的Consumer
, Tcp
中的業務數據包,都可以看作是一個Stream
, 因此Stream
的抽象對非同步編程意義非凡。
Sink
有了代表一次性的非同步值Future
, 也有了代表可重複的非同步值的Stream
, 因此,需要有一個代表一次或多次的非同步值,也就是接下來的Sink
。
pub trait Sink<Item> { type Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; }
Timer
不同於Tcp/Udp/Uds,mio沒有提供對Timer的封裝。
通常來說,對定時器的處理要麼是時間輪,要麼堆,要麼紅黑樹(時間複雜度更為平均O(logN))。時間輪比較典型的案例就是在Kafka
中的使用了,Go runtime
用的則是堆,紅黑樹和堆的實現大致相同。
- 時間輪演算法可以想像做鐘錶,每一格存儲了到期的定時器,因此時間輪的最小精度為每一格所代表的時間。如果定時器的時間超過時間輪所能表示的時間怎麼辦呢,也簡單,可以通過兩種方式來優化。
- 多級時間輪來優化,可以想像,在鐘錶上,秒針每走一圈,分針走一格,同理分針走一圈,時針走一格,因此多級時間輪中,第一級的時間最為精確,第二級次之,第三級再次之…, 超過某一級時間輪所能表示的事件後,將定時器放到下一級時間輪中。
- 超過時間輪所能表示的時間範圍後,對時間取余,插入到餘數所在的格子中,這樣一來,每個格子中存放的定時器需要加入輪數的記錄,用來表明還差多少輪才能執行。每個格子中在插入新的定時器時,可以使用堆來堆定時器進行排序。
- 堆定時器(紅黑樹定時器)
使用最小堆來維護所有的定時器。一個工作執行緒不斷的從堆裡面尋找最近的定時器,如果定時器的時間比當前時間小,則喚醒該定時器對應的task,如果未達到設定的時間,則進行Thread::park(deadline-now)
操作,讓出當前cpu一段時間。
目前futures-timer的實現為全劇唯一的一個堆。存在可優化空間…
組合子
上面定義了實現非同步的最基本概念,Future
, Stream
以及Sink
。
但是很多情況下,我們直接使用它們來構建我們的應用是非常困難的,例如:多個互為競爭關係的Future
,我們只需其中任意一個Future
返回即可,能想到的做法是,我們不斷的遍歷所有的Future
,直到某一個返回Ready
:
loop { for f in futures { if f.is_ready() { return f.output(); } } }
我們可以把上面的邏輯給包裝一下,提供一個名為select!(futures...)
的宏,select
便可作為一個組合子而存在。類似的組合子還有很多,比如join(futures...)
,等待所有Future
完成。
更多的可以參考futures-util
.
Async/Await
上面所有的概念共同組成了Rust
的非同步生態,那麼現在想像一下,如何獲取一個Future
運行的結果呢。一個可能的做法如下:
loop { match f::poll(cx) { Poll::Ready(x) => return x; Poll::Pending => {} } }
如果每次都要用戶這麼做的話,將會是多麼痛苦的一件事兒呀,還不如用註冊回調函數來實現非同步呢!
有沒有更精鍊的方式來獲取Future
的值呢,這就是async/await
出現的原因了。本質上來說,async/await
就是上面程式碼段的一個語法糖,是用戶使用起來更加的自然。上面的程式碼可以替換成:
let x = f.await;
是不是有非常大的簡化呢!
總結
雖然上面提到了各種各樣的概念,但是仔細捋一下,便會發現整個非同步可以分為三層:
- Future/Stream/Sink,Reactor/Executor直接作用於前面的三種類型。此層是為底層,一般用戶很少接觸,庫的開發者接觸較多。
- 組合子層,為了提供更為複雜的操作,誕生了一系列的非同步組合子,使得非同步變得更利於使用,用戶會使用這些組合子來完成各種各樣的邏輯。
- async/await,準確的說,這層遠沒有上面兩層來的重要,但是依然不可或缺,這層使得非同步的開發變得輕而易舉。