Rust非同步淺談

  • 2020 年 3 月 12 日
  • 筆記

前提

  這篇文章主要描述了Rust中非同步的原理,Rust非同步也是在最近的版本中(1.39)中才穩定下來。希望可以通過這邊文章在提高自己認知的情況下,也可以給讀者帶來一些解惑。(來自於本人被Rust非同步毒打的一些經驗之談).

  閱讀這篇文章需要對作業系統,IO多路復用,以及一些數據結構有一定的概念。

Future

Future 字面的意思就是未來發生的事情,在程式中則代表了一系列暫時沒有結果的運運算元,Future需要程式主動去poll(輪詢)才能獲取到最終的結果,每一次輪詢的結果可能是Ready或者Pending

  當Ready的時候,證明當前Future已完成,程式碼邏輯可以向下執行;當Pending的時候,代表當前Future並未執行完成,程式碼不能向下執行,看到這裡就要問了,那什麼時候才能向下執行呢,這裡的關鍵在於Runtime中的Executor需要不停的去執行Futurepoll操作,直至Future返回Ready可以向下執行為止。等等,熟悉Linux的同學可能要說了,怎麼感覺和Epoll模型是非常的相似呢,沒錯,這確實非常相像(但是依然有些許不同,Future可以避免空的輪詢),看樣子優秀的設計在哪裡都可以看到類似的身影。為了實現Rust聲稱的高性能與零開銷抽象,這裡做了一些優化,下面一一講述。

Future結構

pub enum Poll<T> {      Ready(T),      Pending,  }    pub trait Future {      type Output;        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;  }

Future的定義非常簡單,Output代表了Future返回的值的類型,而poll方法是執行Future的關鍵,poll方法可以返回一個Poll類型,Poll類型是一個Enum,包裝了ReadyPending兩種狀態。

Runtime

Runtime 由兩部分組成,ExecutorReactor

Executor為執行器,沒有任何阻塞的等待,循環執行一系列就緒的Future,當Future返回pending的時候,會將Future轉移到Reactor上等待進一步的喚醒。

Reactor為反應器(喚醒器),輪詢並喚醒掛在的事件,並執行對應的wake方法,通常來說,wake會將Future的狀態變更為就緒,同時將Future放到Executor的隊列中等待執行。

執行流程

下面的序列圖大概簡單的描繪了FutureExecutorReactor之間來迴轉移的流程與狀態變化。

sequenceDiagram      participant Executor      participant Reactor      activate Executor      Executor->>Reactor: Pending Future      deactivate Executor      Note left of Executor: Execute other Future      activate Reactor      Reactor->>Executor: Ready Future      deactivate Reactor      activate Executor      deactivate Executor

上面說明了一個簡單的Future的執行,如果是一個比較複雜的Future的話,比如中間會有多次IO操作的話,那麼流程時怎麼樣的呢?看下面一段程式碼:(僅僅作為demo,不代表可以直接使用)

async fn read_and_write(s: TcpStream) {    let (mut r, mut w) = s.split();    let mut buffer = r.read().await.unwrap();    buffer.append("Hello,world");    w.write_all(buffer.as_bytes()).await.unwrap();  }

對應的執行流程為:

sequenceDiagram      participant Executor      participant Reactor      activate Executor      deactivate Executor      Executor->>Reactor: Pending on r.read()      Note left of Executor: Execute other Future      activate Reactor      Reactor->>Executor: r.read() is ready      Note left of Executor: Execute current Future      deactivate Reactor      Executor->>Reactor: Pending on w.write_all()      Note left of Executor: Execute other Future      activate Reactor      deactivate Reactor      Reactor->>Executor: w.write_all() is ready

  上面的這些例子系統中只展示了一個Future的執行情況,真實的生產環境中,可能有數十萬的Future同時在執行,ExecutorReactor的調度模型要更複雜一些。

總結

一句話概括RuntimeFuture不能馬上返回值的時候,會被交給ReactorFuture的值準備就緒後,調用wake傳遞給Executor執行,反覆執行,直至整個Future返回Ready

Executor

  通常來說,Executor的實現可以是單執行緒與執行緒池兩個版本,兩種實現間各有優劣,單執行緒少了數據的競爭,但是吞吐量卻容易達到瓶頸,執行緒池的實現可以提高吞吐量,但是卻要處理數據的競爭衝突。下面我們以async-std來分析基於執行緒池的實現:

fn main_loop() {      loop {          match find_runnable() {              Some(task) => task.run();              None => {                  // 實際上,這裡根據空循環的次數,會陷入睡眠狀態或出讓CPU資源,直到新的task來喚醒。              }          }      }  }    fn find_runnable() -> Option<Task> {      let task = get_local();      if task.is_some() {          return task;      }      let task = get_local();      if task.is_some() {          return task;      }      steal_other()  }

這裡做了大量的簡化,整個Executor是一個執行緒池,每個執行緒都在不斷的尋找可執行的task,然後執行,然後再找下一個task,再執行,永遠重複。

從上面的main_loop中可以看到,cpu並不是一直毫無意義的空轉,中間會有一些策略來優化cpu的使用。

Reactor

Reactor作為反應器,上面同時掛在了成千上萬個待喚醒的事件, 這裡使用了mio統一封裝了作業系統的多路復用API。在Linux中使用的是Epoll,在Mac中使用的則是Kqueue,具體的實現在此不多說。

  在Future的基礎上,出現了AsyncRead/AsyncWrite/AsyncSeek等抽象來描述IO操作,在執行對應的Read/Write/Seek操作時,如果底層的數據尚未準備好,會把所在的Future註冊至Reactor。Reactor的流程如下:

loop {      poll.poll(&events, timeout);      for event in events.iter() {          if (event.is_readable()) {              for waker in event.readers.wakers {                  waker.wake();              }          }          if (event.is_writeable()) {              for waker in event.writers.wakers {                  waker.wake();              }          }      }  }

Reactor會不斷的poll就緒的事件,然後依次喚醒綁定在事件上的wakerwaker喚醒的時候會把對應的task移動到Executor的就緒隊列上安排執行。

結合Executor的運作原理不難發現,Executor肯定不會poll到未就緒的task,因為只有就緒的任務才會被放到Executor的執行隊列中,Executor的資源利用率再一次被提高,這就是整個非同步體系的高明之處。

Stream

Future是非同步開發中最基礎的概念了,如果說Future代表了一次性的非同步的值,那麼Stream則代表了一系列的非同步的值。Future是1,Stream是0,1或者N。簽名如下:

pub trait Stream {      type Item;        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;  }

Stream對應了同步原語中的Iterator的概念,想一下,是不是連簽名都是如此的相像呢。

pub trait Iterator {      type Item;        fn next(&mut self) -> Option<Self::Item>;  }

Stream用來抽象源源不斷的數據源,當然也可以斷。可以用來抽象Websocket Connection,在Websokcet中,服務源源不斷的接受客戶端的值並處理,直至客戶端斷開連接。更進一步的抽象,MQ中的Consumer, Tcp中的業務數據包,都可以看作是一個Stream, 因此Stream的抽象對非同步編程意義非凡。

Sink

有了代表一次性的非同步值Future, 也有了代表可重複的非同步值的Stream, 因此,需要有一個代表一次或多次的非同步值,也就是接下來的Sink

pub trait Sink<Item> {      type Error;        fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;      fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;      fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;      fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;  }

Timer

  不同於Tcp/Udp/Uds,mio沒有提供對Timer的封裝。

  通常來說,對定時器的處理要麼是時間輪,要麼堆,要麼紅黑樹(時間複雜度更為平均O(logN))。時間輪比較典型的案例就是在Kafka中的使用了,Go runtime用的則是堆,紅黑樹和堆的實現大致相同。

  1. 時間輪演算法可以想像做鐘錶,每一格存儲了到期的定時器,因此時間輪的最小精度為每一格所代表的時間。如果定時器的時間超過時間輪所能表示的時間怎麼辦呢,也簡單,可以通過兩種方式來優化。
  • 多級時間輪來優化,可以想像,在鐘錶上,秒針每走一圈,分針走一格,同理分針走一圈,時針走一格,因此多級時間輪中,第一級的時間最為精確,第二級次之,第三級再次之…, 超過某一級時間輪所能表示的事件後,將定時器放到下一級時間輪中。
  • 超過時間輪所能表示的時間範圍後,對時間取余,插入到餘數所在的格子中,這樣一來,每個格子中存放的定時器需要加入輪數的記錄,用來表明還差多少輪才能執行。每個格子中在插入新的定時器時,可以使用堆來堆定時器進行排序。
  1. 堆定時器(紅黑樹定時器)

  使用最小堆來維護所有的定時器。一個工作執行緒不斷的從堆裡面尋找最近的定時器,如果定時器的時間比當前時間小,則喚醒該定時器對應的task,如果未達到設定的時間,則進行Thread::park(deadline-now)操作,讓出當前cpu一段時間。

目前futures-timer的實現為全劇唯一的一個堆。存在可優化空間…

組合子

上面定義了實現非同步的最基本概念,Future, Stream以及Sink

但是很多情況下,我們直接使用它們來構建我們的應用是非常困難的,例如:多個互為競爭關係的Future,我們只需其中任意一個Future返回即可,能想到的做法是,我們不斷的遍歷所有的Future,直到某一個返回Ready

loop {      for f in futures {          if f.is_ready() {              return f.output();          }      }  }

我們可以把上面的邏輯給包裝一下,提供一個名為select!(futures...)的宏,select便可作為一個組合子而存在。類似的組合子還有很多,比如join(futures...),等待所有Future完成。

更多的可以參考futures-util.

Async/Await

上面所有的概念共同組成了Rust的非同步生態,那麼現在想像一下,如何獲取一個Future運行的結果呢。一個可能的做法如下:

loop {      match f::poll(cx) {          Poll::Ready(x) => return x;          Poll::Pending => {}      }  }

如果每次都要用戶這麼做的話,將會是多麼痛苦的一件事兒呀,還不如用註冊回調函數來實現非同步呢!

有沒有更精鍊的方式來獲取Future的值呢,這就是async/await出現的原因了。本質上來說,async/await就是上面程式碼段的一個語法糖,是用戶使用起來更加的自然。上面的程式碼可以替換成:

let x = f.await;

是不是有非常大的簡化呢!

總結

雖然上面提到了各種各樣的概念,但是仔細捋一下,便會發現整個非同步可以分為三層:

  1. Future/Stream/Sink,Reactor/Executor直接作用於前面的三種類型。此層是為底層,一般用戶很少接觸,庫的開發者接觸較多。
  2. 組合子層,為了提供更為複雜的操作,誕生了一系列的非同步組合子,使得非同步變得更利於使用,用戶會使用這些組合子來完成各種各樣的邏輯。
  3. async/await,準確的說,這層遠沒有上面兩層來的重要,但是依然不可或缺,這層使得非同步的開發變得輕而易舉。