【譯】理解Rust中的Futures(二)

  • 2020 年 12 月 23 日
  • 筆記

原文標題:Understanding Futures in Rust — Part 2

原文鏈接://www.viget.com/articles/understanding-futures-is-rust-part-2/
公眾號: Rust 碎碎念

翻譯 by: Praying

背景

如果你還沒有看前面的內容,可以在這裡[1]查看(譯註:已有譯文,可在公眾號查看)。

在第一部分,我們介紹了 Future trait,了解了 future 是如何被創建和運行的,並且開始知道它們如何能被鏈接到一起。

上次內容的程式碼可以在這個 playground 鏈接[2]查看,並且本文中所有示例程式碼將會以這段程式碼為基礎。

注意:所有的程式碼示例都有對應的 playground 鏈接,其中一些用於解釋說明但無法編譯的程式碼會有相應的標記。

目標

如果你熟悉 JavaScript 中的 promise 並且閱讀了最新的部落格,你可能會對先前文章中提到的組合子(thencatchfinally)感到困惑。

你將會在本文章找到與它們對等的東西,並且在最後,下面這段程式碼將能夠編譯。你將會理解使得 future 能夠運作的類型,trait 和底層概念。

// This does not compile, yet

fn main() {
    let my_future = future::ready(1)
        .map(|x| x + 3)
        .map(Ok)
        .map_err(|e: ()| format!("Error: {:?}", e))
        .and_then(|x| future::ready(Ok(x - 3)))
        .then(|res| {
            future::ready(match res {
                Ok(val) => Ok(val + 3),
                err => err,
            })
        });

    let val = block_on(my_future);
    assert_eq!(val, Ok(4));
}

工具函數

首先,我們需要一些工具函數,future::readyblock_on。這些函數能夠讓我們很容易地創建和運行 future 直到它們完成,這些函數雖然有用,但是在生產環境的程式碼中並不常見。

在開始之前,我們先把我們的Future trait 和Context結構體整合到模組里以免和標準庫衝突。

mod task {
    use crate::NOTIFY;

    pub struct Context<'a> {
        waker: &'a Waker,
    }

    impl<'a> Context<'a> {
        pub fn from_waker(waker: &'a Waker) -> Self {
            Context { waker }
        }

        pub fn waker(&self) -> &'a Waker {
            &self.waker
        }
    }

    pub struct Waker;

    impl Waker {
        pub fn wake(&self) {
            NOTIFY.with(|f| *f.borrow_mut() = true)
        }
    }

}
use crate::task::*;

mod future {
    use crate::task::*;

    pub enum Poll<T> {
        Ready(T),
        Pending,
    }

    pub trait Future {
        type Output;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output>;
    }
}
use crate::future::*;

Playground 鏈接[3]

這裡唯一需要注意的就是,只有將模組,類型和函數公開,才能在程式碼中使用它們。這可以通過pub關鍵字來完成。

工具函數實現

Future::Ready

future::ready創建了一個 future,該 future 帶有傳入值並且是立即就緒(ready)的。當你有一個已經不是 future 的值的時候,這個函數可以用於開啟一個 future 鏈,就像前一個示例那樣。

mod future {
    // ...

    pub struct Ready<T>(Option<T>);

    impl<T> Future for Ready<T> {
        type Output = T;

        fn poll(&mut self, _: &Context) -> Poll<Self::Output> {
            Poll::Ready(self.0.take().unwrap())
        }
    }

    pub fn ready<T>(val: T) -> Ready<T> {
        Ready(Some(val))
    }
}

fn main() {
    let my_future = future::ready(1);
    println!("Output: {}", run(my_future));
}

Playground 鏈接[4]

我們創建了一個類型為Ready<T>的泛型結構體,該結構體包裝了一個Option。這裡我們使用Option枚舉以保證 poll 函數只被調用一次。在 executor 的實現中,在返回一個Poll::Ready之後調用 poll 將會報錯。

BLOCK_ON

為了我們的目標,我們把我們的 run 函數重命名為block_on。在future-preview 這個 crate 中,該函數使用內部的LocalPool來運行一個 future 直到完成,同時會阻塞當前執行緒。我們的函數也做了相似的事情。

fn block_on<F>(mut f: F) -> F::Output
where
    F: Future,
{
    NOTIFY.with(|n| loop {
        if *n.borrow() {
            *n.borrow_mut() = false;
            let ctx = Context::from_waker(&Waker);
            if let Poll::Ready(val) = f.poll(&ctx) {
                return val;
            }
        }
    })
}

fn main() {
    let my_future = future::ready(1);
    println!("Output: {}", block_on(my_future));
}

Playground 鏈接[5]

組合子(Combinators)

首先,讓我們從一些能夠讓你直接作用於另一個 Future 的Output值的一些組合子開始。在本文中,我們使用非正式的但是比較流行的組合子定義,即能夠允許你對某種類型執行操作,並與其他類型結合起來的函數。例如,一個嵌套的 future 可以由一個組合子函數函數創建,它可以有一個複雜的類型Future< Output = Future < Output = i32>>。這可以被稱為一個 future,該 future 的輸出(Output)是另一個 future,新的 future 的輸出是 i32 類型。這樣的組合子中,最簡單的一個就是map

Map

如果你熟悉Result或者Option類型的map函數,那麼對它應該不陌生。map 組合子持有一個函數並將其應用到 future 的Output值上,返回一個新的 future,這個新 future 把函數的結果(Result)作為它的Output。Future 中的 map 組合子甚至比Result或者Option中更簡單,因為不需要考慮 failure 的情況。map 就是簡單的Future->Future

下面是函數簽名:

// does not compile
fn map<U, F>(selfSized, f: F) -> Map<Self, F>
where
    F: FnOnce(Self::Output) -> U,
    SelfSized,

map是一個泛型函數,它接收一個閉包,返回一個實現了 Future 的Map結構體。不是每當我們在值上進行鏈接都需要實現Futuretrait,正如我們在最後一部分做的那樣,我們可以使用這些函數來為我們完成這些工作。

讓我們來分析一下:

  • Map<Self, F>聲明了 map 函數的(返回)類型,包括當前的 future,以及傳入函數的 future。

  • where是一個能夠讓我們添加類型約束的關鍵字。對於F類型參數,我們可以在內部定義約束map<U, F: FnOnce(Self::Output) -> U,但是使用 where 語句可讀性會更好。

  • FnOnce(Self::Output) -> U是一個函數的類型定義,該函數接收當前類型的Output並返回任意類型UFnOnce是函數 trait 中的一個,其他還包括FnMutFnFnOnce是用起來最簡單的,因為編譯器可以保證這個函數只被調用一次。它使用環境中用到的值並獲取其所有權。FnFnMut分別以不可變和可變的方式借用環境中值的引用。所有的閉包都實現了FnOncetrait,並且其中一些沒有移動值的閉包還實現了FnMutFntrait。這是 Rust 做的最酷的事情之一,允許對閉包和第一類函數參數進行真正有表達力的使用。Rust book 中的相關內容[6]值得一讀。

  • Self: Sized是一個約束,允許map只能被Sized的 trait 實現者調用。你不必考慮這個問題,但是確實有些類型不是Sized。例如,[i32]是一個不確定大小的數組。因為我們不知道它多長。如果我們想要為它實現我們的Future trait,那麼我們就不能對它調用map

大多數組合子都遵循這個模式,因此接下來的文章我們就不需要分析的這麼仔細了。

下面是一個map的完整實現,它的Map類型以及它對Future的實現

mod future {
    trait Future {
        // ...

        fn map<U, F>(self, f: F) -> Map<Self, F>
        where
            F: FnOnce(Self::Output) -> U,
            SelfSized,
        {
            Map {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...

    pub struct Map<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, F, T> Future for Map<Fut, F>
    where
        Fut: Future,
        F: FnOnce(Fut::Output) -> T,
    {
        type Output = T;

        fn poll(&mut self, cx: &Context) -> Poll<T> {
            match self.future.poll(cx) {
                Poll::Ready(val) => {
                    let f = self.f.take().unwrap();
                    Poll::Ready(f(val))
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1).map(|val| val + 1);
    println!("Output: {}", block_on(my_future));
}

Playground 鏈接[7]

從高層次來講,當我們調用一個 future 上的map時,我們構造了一個Map類型,該類型持有當前 future 的引用以及我們傳入的閉包。Map對象自身也是一個 Future。當它被輪詢時,它依次輪詢底層的 future。當底層的 future 就緒後,它獲取那個 future 的Output的值並且把它傳入閉包,對Poll::Ready中的閉包返回的值進行包裝(wrapping)並且把新值向上傳遞。

如果你閱讀了最新的部落格,你對在這裡看到的東西應該感到很熟悉,但是在我們繼續之前,我會快速地講解作為一個複習。

  • pub struct Map<Fut, F>是一個關於 future——Fut和函數F的泛型。

  • f: Option<F>是一個包裝了閉包了Option類型。這裡是個小技巧,以保證閉包只被調用一次。當你獲取一個Option的值,它會用None替換內部的值並且返回裡面包含的值。如果在返回一個Poll::Ready之後被輪詢,這個函數會 panic。在實際中,future 的 executor 不會允許這種情況發生。

  • type Output = T;定義了 map future 的輸出和我們的閉包的返回值是將會是相同的。

  • Poll::Read(f(val))返回帶有閉包返回結果的就緒(ready)狀態。

  • Poll::Pending => Poll::Pending 如果底層的 future 返回 pending,繼續傳遞。

  • future::ready(1).map(|val| val + 1); 這對就緒(ready)future 的輸出進行了 map,並對其加 1。它返回了一個 map future,其中帶有對原先的 future 的一個引用。map future 在運行期間輪詢原先的 future 是否就緒(ready)。這和我們的AddOneFuture做的是相同的事情。

這真的很酷,主要有以下幾個原因。首先,你不必對每一個你想要進行的計算都實現一個新的 future,它們可以被包裝(wrap)進組合子。事實上,除非你正在實現你自己的非同步操作,否則你可能從來都不需要自己去實現Future trait。

Then

現在我們有了map,我們可以把任何我們想要的計算鏈接起來,對么?答案是對的,但是對此還有一個相當大的警告。

想像一下,當你有一些函數,這些函數返回你想要鏈接起來的 future。對於這個例子,我們可以想像,它們是下面的 api 調用,這些調用返回包裝(wrap)在 future 中的結果,get_userget_files_for_user

// does not compile
fn main() {
    let files_future = get_user(1).map(|user| get_files_for_user(user));
    println!("User Files: {}", block_on(files_future));
}

這段程式碼無法編譯,但是你可以想像你在這裡構建的類型,看起來應該像這樣:Future<Output = Future<Output= FileList>>。這在使用ResultOption類型的時候也是一個常見問題。使用map函數經常會導致嵌套的輸出和對這些嵌套的繁瑣處理。在這種情況下,你不得不去跟蹤到底嵌套了多少層並且對每一個嵌套的 future 都調用block_on

幸運地是,ResultOption有一個被稱為and_then的解決方案。Optionand_then通過對T應用一個函數來映射(map)Some(T) -> Some(U),並且返回閉包所返回的Option。對於 future,它是通過一個稱為then的函數來實現的,該函數看起來很像映射(map),但是這個閉包應該它自己的 future。在一些語言中,這被稱為flatmap。這裡值得注意的是,傳遞給then的閉包返回的值必須是實現了Future,否則你將會得到一個編譯器錯誤。

這裡是我們的對於thenThen結構體和它的對Future trait 的實現。其中的大部分內容和我們在 map 中做的很像。

mod future {
    trait Future {
        // ...
        fn then<Fut, F>(self, f: F) -> Then<Self, F>
        where
            F: FnOnce(Self::Output) -> Fut,
            Fut: Future,
            SelfSized,
        {
            Then {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...

    pub struct Then<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, NextFut, F> Future for Then<Fut, F>
    where
        Fut: Future,
        NextFut: Future,
        F: FnOnce(Fut::Output) -> NextFut,
    {
        type Output = NextFut::Output;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.poll(cx) {
                Poll::Ready(val) => {
                    let f = self.f.take().unwrap();
                    f(val).poll(cx)
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1));
    println!("Output: {}", block_on(my_future));
}

Playground 鏈接[8]

這裡面沒見過的程式碼可能是f(val).poll(cx)。它調用了帶有先前 future 的閉包並且直接返回給你poll的值。

聰明的你可能會意識到,我們的Then::poll函數可能會 panic。如果第一個 future 返回就緒(ready)但是第二個 future 返回Poll::Pending,接著let f = self.f.take().unwrap();這行程式碼就會在下次被輪詢(poll)的時候 panic 並退出程式。在future-preview中,這種情況會通過一個稱為Chain[9]的類型來處理。Chain 通過 unsafe 程式碼塊來實現,並且使用了新類型——Pin。這些內容超出了本文的範圍。目前來講,我們可以假定任何通過then閉包返回的 future 都絕不會返回Poll::Pending。總體來講,這不是個安全的假設。

Result 組合子

在 futures-rs 庫的 0.1 版本中,Future trait 和Result類型緊密關聯。Future trait 的定義如下:

// does not compile
trait Future {
    type Item;
    type Error;

    fn poll(self) -> Poll<Self::Item, Self::Error>;
}

Poll類型里定義了成功狀態、失敗狀態和未就緒狀態。這意味著像map這種函數只有當 Poll 是就緒並且不是錯誤的情況下才能執行。儘管這會產生一些困擾,但是它在鏈接組合子並且根據成功或失敗狀態做決定的時候,會產生一些非常好的人體工程學(ergonomics )。

這與std::future的實現方式有所不同。現在 future 要麼是就緒或者是未就緒,對於成功或失敗語義是不可知的。它們可以包含任何值,包括一個Result。為了得到便利的組合子,比如像map_err能夠讓你只改變一個嵌套的 Result 中的錯誤類型,或者想and_then這樣,允許你只改變嵌套 Result 中的值類型,我們需要實現一個新的 trait。下面是TryFuture的定義:

mod future {
    //...
    pub trait TryFuture {
        type Ok;
        type Error;

        fn try_poll(self, cx: &mut Context) -> Poll<Result<Self::Ok, Self::Error>>;
    }

    impl<F, T, E> TryFuture for F
    where
        F: Future<Output = Result<T, E>>,
    {
        type Ok = T;
        type Error = E;

        fn try_poll(&mut self, cx: &Context) -> Poll<F::Output> {
            self.poll(cx)
        }
    }
}

Playground 鏈接[10]

TryFuture是一個 trait,我們可以為任意的類型<F, T, E>實現這個 trait,其中F實現了Future trait,它的Output類型是Result<T,E>。它只有一個實現者。那個實現者定義了一個try_poll函數,該函數與Future trait 上的poll有相同的簽名,它只是簡單地調用了poll方法。

這意味著任何一個擁有 Result 的Output類型的 future 也能夠訪問它的成功/錯誤(success/error)狀態。這也使得我們能夠定義一些非常方便的組合子來處理這些內部 Result 類型,而不必在一個mapand_then組合子內顯示地匹配OkErr類型。下面是一些能夠闡述這個概念的實現。

AndThen

讓我們回顧之前想像到的 API 函數。假定它們現在處於會發生網路分區和伺服器中斷的現實世界中,不會總是能返回一個值。這些 API 方法實際上會返回一個嵌有 result 的 future 以表明它已經完成,並且是要麼是成功完成,要麼是帶有錯誤的完成。我們需要去處理這些結果,下面是我們可能是根據現有工具處理它的方式。

// does not compile
fn main() {
    let files_future = get_user(1).then(|result| {
        match result {
            Ok(user) => get_files_for_user(user),
            Err(err) => future::ready(Err(err)),
        }
    });

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

情況還不算太壞,但是假定你想要鏈接更多的 future,事情很快就會變得一團糟。幸運的是,我們可以定義一個組合子——and_then,該組合子將會把類型Future<Output = Result<T, E>>映射到Future<Output = Result<U, E>>,其中我們把T變為了U

下面是我們定義它的方式:

mod future {
    pub trait TryFuture {
        // ...

        fn and_then<Fut, F>(self, f: F) -> AndThen<Self, F>
        where
            F: FnOnce(Self::Ok) -> Fut,
            Fut: Future,
            SelfSized,
        {
            AndThen {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...
    pub struct AndThen<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, NextFut, F> Future for AndThen<Fut, F>
    where
        Fut: TryFuture,
        NextFut: TryFuture<Error = Fut::Error>,
        F: FnOnce(Fut::Ok) -> NextFut,
    {
        type Output = Result<NextFut::Ok, Fut::Error>;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.try_poll(cx) {
                Poll::Ready(Ok(val)) => {
                    let f = self.f.take().unwrap();
                    f(val).try_poll(cx)
                }
                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1))
        .map(Ok::<i32, ()>)
        .and_then(|val| future::ready(Ok(val + 1)));

    println!("Output: {:?}", block_on(my_future));
}

Playground 鏈接[11]

你對此應該較為熟悉。事實上,這和then組合子的實現基本一致。只有一些關鍵的區別需要注意:

  • 函數定義在 TryFuture trait 中

  • type Output = Result<NextFut::Ok, Fut::Error>;表明 AndThen future 的輸出擁有新的 future 的值類型,以及在它之前的 future 的錯誤類型。換句話說,如果先前的 future 的輸出包含一個錯誤類型,那麼這個閉包將不會被執行。

  • 我們調用的是try_poll而不是poll

值得注意的是,當你像這樣來鏈接組合子的時候,它們的類型前面可能會變得很長且在編譯錯誤資訊中難以閱讀。and_then函數要求 future 調用時的錯誤類型和由閉包返回的類型必須是相同的。

MapErr

回到我們的想像的 api 調用。假定調用的 api 都返回帶有同一類錯誤的 future,但是你需要在調用之間進行額外的步驟。假定你必須解析第一個 api 結果然後把它傳遞給第二個。

// 無法編譯
fn main() {
    let files_future = get_user(1)
        .and_then(|user_string| parse::<User>())
        .and_then(|user| get_files_for_user(user));

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

這看起來很好,但是無法編譯,並且會有個晦澀的錯誤資訊說它期望得到像ApiError的東西但是卻找到了一個ParseError。你可以在解析返回的Result上使用過map_err組合子,但是對於 future 應該如何處理呢?如果我們為 TryFuture 實現一個map_err,那麼我們可以重寫成下面這樣:

// 無法編譯
fn main() {
    let files_future = get_user(1)
        .map_err(|e| format!("Api Error: {}", e))
        .and_then(|user_string| parse::<User>())
        .map_err(|e| format!("Parse Error: {}", e))
        .and_then(|user| get_files_for_user(user))
        .map_err(|e| format!("Api Error: {}", e));

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

如果這讓你看著比較混亂,請繼續關注本系列的第三部分,我將談談如何處理這個問題和你可能會在使用 future 時遇到的其他問題。

下面是我們實現map_err的方式

mod future {
    pub trait TryFuture {
        // ...

        fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
        where
            F: FnOnce(Self::Error) -> E,
            SelfSized,
        {
            MapErr {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...
    pub struct MapErr<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, F, E> Future for MapErr<Fut, F>
    where
        Fut: TryFuture,
        F: FnOnce(Fut::Error) -> E,
    {
        type Output = Result<Fut::Ok, E>;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.try_poll(cx) {
                Poll::Ready(result) => {
                    let f = self.f.take().unwrap();
                    Poll::Ready(result.map_err(f))
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1))
        .map(Ok)
        .and_then(|val| future::ready(Ok(val + 1)))
        .map_err(|_: ()| 5);

    println!("Output: {:?}", block_on(my_future));
}

Playground 鏈接[12]

唯一比較陌生的地方是Poll::Ready(result.map_err(f))。在這段程式碼里,我們傳遞我們的閉包到Result類型的map_err函數里。

包裝 (Wrap Up)

現在,文章開頭的程式碼可以運行了!比較酷的是這些全都是我們自己實現的。還有很多其他用途的組合子,但是它們幾乎都是相同的方式構建的。讀者可以自己練習一下,試試實現一個map_ok組合子,行為類似於TryFuture上的map_err但是適用於成功的結果。

Playground 鏈接[13]

概要重述(Recap)

  • Rust 中的 Future 之所以如此強大,是因為有一套可以用於鏈接計算和非同步調用的組合子。

  • 我們也學習了 Rust 強大的函數指針 trait,FnOnceFnMutFn

  • 我們已經了解了如何使用嵌入在 future 中的 Result 類型。

接下來

在第三部分中,我們將會介紹使錯誤處理沒有那麼痛苦的方式,當你有很多分支時,如何處理返回的 future,以及我們將深入到 async/await 這個令人激動的世界。

參考資料

[1]

這裡: //www.viget.com/articles/understanding-futures-in-rust-part-1/


[2]

這個playground鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=c354bc3ffaf4cbb5502e839f96459023


[3]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=6cebb88919bd65411178ce8019a3aa06


[4]

Playground 鏈接: //www.viget.com/articles/understanding-futures-is-rust-part-2/


[5]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=9e7fca1f3c6f2f5f91b25622db71635f


[6]

Rust book中的相關內容: //doc.rust-lang.org/book/ch13-01-closures.html


[7]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=9c427527c64b4dd5238c508de1d4151a


[8]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=d86c1223ed4318dcbfa3539ca9a021f2


[9]

Chain: //docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.StreamExt.html#method.chain


[10]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=78daa6a5e60df17d8334199c43fe1e36


[11]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=71fe0962974657f6b9be25510a652b3d


[12]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f9a6cc9cddaac1a43a85bc24db436964


[13]

Playground 鏈接: //play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=92a88fffb74ad350a4db1970b646c41f