【Rust每周一知】Rust 异步入门

  • 2020 年 2 月 24 日
  • 笔记

这是一篇博文翻译,略有删减,整理代码方便统一阅读,Github链接:https://github.com/lesterli/rust-practice/tree/master/head-first/async-primer。

原文在2月11号的【Rust日报】中给大家推荐过, 原文链接: https://omarabid.com/async-rust

本文并不全面介绍Rust异步主题。如果对新的async/await关键字Futures感到疑惑,并且对Tokio的用途很感兴趣,那么到最后应该会不再毫无头绪。

Rust异步技术是Rust领域的新热点,它被誉为Rust的重要里程碑,特别适合开发高性能网络应用程序的人们。

让我们从头开始。

什么是异步?

关于Async,我给一个简短的版本:如果有一个处理器,想同时执行(类似)两项任务,将如何做?解决方案是先运行第一个任务,然后切换并运行第二个任务,然后再切换回去,依此类推,直到完成两个任务。

如果想给人以计算机同时运行两个任务的感觉(即多任务处理),则此功能很有用。另一个用例是IO操作。当程序等待网络响应时,CPU处于空闲状态。这是切换到另一个任务的理想时间。

那么我们如何编写异步代码?

首先,让我们从一些同步代码开始。

同步代码

让我们做一个简单的程序,该程序读取两个文件:file1.txtfile2.txt。我们从file1.txt开始,然后移至file2.txt

我们将程序分为两个文件:main.rsfile.rsfile.rs有一个函数:read_file,在main.rs中,用每个文件的路径为参数调用此函数。参见下面代码:

// sync-example/src/file.rs    use std::fs::File;  use std::io::{self, Read};    pub fn read_file(path: &str) -> io::Result<String> {      let mut file = File::open(path)?;      let mut buffer = String::new();      file.read_to_string(&mut buffer)?;      Ok(buffer)  }
// sync-example/src/main.rs    use std::io;    mod file;    fn main() -> io::Result<()> {      println!("program started");        let file1 = file::read_file("src/file1.txt")?;      println!("processed file 1");        let file2 = file::read_file("src/file2.txt")?;      println!("processed file 2");        dbg!(&file1);      dbg!(&file2);        Ok(())  }

使用cargo run编译并运行程序。该程序应该毫无意外地运行,但是请确保已在src文件夹中放置了两个文件(file1.txtfile2.txt)。

program started  processed file 1  processed file 2  [src/main.rs:14] &file1 = "file1"  [src/main.rs:15] &file2 = "file2"

到目前为止,一切都很好。如果需要在处理file2.txt之前先处理file1.txt,那么这是唯一的方法。但是有时不必关心每个文件的处理顺序。理想情况下,希望尽快处理文件。

在这种情况下,我们可以利用多线程。

多线程方法

为此,我们为每个函数调用运行一个单独的线程。由于我们使用的是多线程代码,并且如果要访问线程外部的文件内容,则必须使用Rust提供的同步原语之一。

这将如何影响代码:file.rs将保持不变,因此这已经是一件好事了。在main.rs中,我们需要初始化两个RwLock;这些将稍后在线程中用于存储文件内容。

然后,我们运行一个无限循环,尝试读取这两个变量的内容。如果这些变量不为空,则我们知道文件处理(或读取)已完成。 (这意味着文件不应为空;否则,我们的程序将错误地保持等待状态。另一种方法是使用Option<String>并检查Option是否为None)。

此代码需要crate lazy_static

// multi-example/src/main.rs    use std::io;  use std::sync::RwLock;  use std::thread;    use lazy_static::lazy_static;    mod file;    // A sync primitive that allows to read/write from variables between threads.  // we declare the variables here, this requires the lazy_static crate  lazy_static! {      static ref FILE1: RwLock<String> = RwLock::new(String::from(""));      static ref FILE2: RwLock<String> = RwLock::new(String::from(""));  }    fn main() -> io::Result<()> {      println!("program started");        let thread_1 = thread::spawn(|| {          let mut w1 = FILE1.write().unwrap();          *w1 = file::read_file("src/file1.txt").unwrap();          println!("read file 1");      });        println!("Launched Thread 1");        let thread_2 = thread::spawn(|| {          let mut w2 = FILE2.write().unwrap();          *w2 = file::read_file("src/file2.txt").unwrap();          println!("read file 2");      });        println!("Launched Thread 2");        let mut rf1: bool = false;      let mut rf2: bool = false;        loop {      	// read()          let r1 = FILE1.read().unwrap();          let r2 = FILE2.read().unwrap();            if *r1 != String::from("") && rf1 == false {              println!("completed file 1");              rf1 = true;          }            if *r2 != String::from("") && rf2 == false {              println!("completed file 2");              rf2 = true;          }      }        Ok(())  }

有趣的是,如果我们有一个非常大的file1.txt,我们将得到一个奇怪的输出。首先处理第二个文件(读取文件2);但在我们的循环内部,该程序似乎阻塞并等待第一个文件。

program started  Launched Thread 1  Launched Thread 2  read file 2  read file 1  completed file 1  completed file 2

多线程可能有点棘手,因为我们必须考虑可能阻塞的原子操作。我们使用read函数来解锁我们的变量,并且文档对这种行为发出警告。

使用共享的读取访问权限锁定此rwlock,阻塞当前线程,直到可以获取它为止。

幸运的是,有一个try_read函数,如果无法获取锁,则返回Err

尝试使用共享的读取访问权限获取此rwlock。 如果此时不能授予访问权限,则返回Err。 否则,将返回RAII保护,当该保护被删除时,该保护将释放共享访问。

在第二次尝试中,我们使用try_read并忽略返回的Errs,因为它们应该表示我们的锁正忙。这有助于将程序移至下一个变量,并处理先准备好的变量。

// multi-example/src/main.rs    ...      loop {      	// try_read()          let r1 = FILE1.try_read();          let r2 = FILE2.try_read();            match r1 {              Ok(v) => {                  if *v != String::from("") && rf1 == false {                      println!("completed file 1");                      rf1 = true;                  }              }              // If rwlock can't be acquired, ignore the error              Err(_) => {}          }            match r2 {              Ok(v) => {                  if *v != String::from("") && rf2 == false {                      println!("completed file 2");                      rf2 = true;                  }              }              // If rwlock can't be acquired, ignore the error              Err(_) => {}          }      }  ...

现在执行方式有所不同。如果file1.txtfile2.txt大得多,则应首先处理第二个文件。

program started  Launched Thread 1  Launched Thread 2  read file 2  completed file 2  read file 1  completed file 1

多线程的局限性

如果我们已经有多线程,为什么我们需要异步?有两个主要优点:性能和简单性。产生线程很昂贵;从以上内容可以得出结论,编写多线程代码可能会变得非常复杂。

异步,关键字

Rust的重点是使编写Async代码尽可能简单。只需要在函数声明之前添加async/await关键字即可使代码异步:函数声明前async,解析异步函数await

这听起来很不错。试一试吧。

use std::fs::File;  use std::io::{self, Read};    pub async fn read_file(path: &str) -> io::Result<String> {      let mut file = File::open(path)?;      let mut buffer = String::new();      file.read_to_string(&mut buffer)?;      Ok(buffer)  }
use std::io;    mod file;    fn main() -> io::Result<()> {      let r1 = file::read_file("src/file1.txt");      let r2 = file::read_file("src/file2.txt");        let f1 = r1.await;      let f2 = r2.await;        dbg!(f1);      dbg!(f2);        Ok(())  }

但是这不能通过编译,await仅在异步块或函数中可用。如果我们尝试运行此代码,则编译器将引发此错误。

error[E0728]: `await` is only allowed inside `async` functions and blocks   --> src/main.rs:9:14    |  5 | fn main() -> io::Result<()> {    |    ---- this is not `async`  ...  9 |     let f1 = r1.await;    |              ^^^^^^^^ only allowed inside `async` functions and blocks

我们可以使main函数异步吗?不幸的是,事情并非如此简单。我们得到另一个错误。

error[E0277]: `main` has invalid return type `impl std::future::Future`   --> src/main.rs:5:20    |  5 | async fn main() -> io::Result<()> {    |                    ^^^^^^^^^^^^^^ `main` can only return types that implement `std::process::Termination`    |    = help: consider using `()`, or a `Result`

但是,错误消息有点令人着迷。似乎async关键字使我们的函数返回Future而不是声明的类型。

异步函数的返回类型是Future(确切地说是实现Future特性的闭包)。

await呢?await在整个Future中循环直至完成。但是,还有另外一个谜团:Rust无法自解析Future。我们需要一个执行器来运行此异步代码。

什么是执行器?

如果回顾一下我们的多线程示例,会注意到我们使用循环来检测何时处理文件。这很简单:无限循环直到变量中包含某些内容,然后执行某些操作。如果读取两个文件,我们可以通过跳出循环来改善这一点。

一个异步执行器是循环。默认情况下,Rust没有任何内置的执行程序。有许多异步运行时;async-stdTokio是最受欢迎的。运行时的工作是轮询异步函数(Future),直到它们最终返回一个值。

一个简单的执行器

crate futures有一个非常基本的执行器,并且具有将两个Future连接的函数。让我们试一试。

以下代码使用crate futures版本0.3.4。

// async-example/src/main.rs    use futures::executor::block_on;  use futures::join;  use std::io;    mod file;    fn main() -> io::Result<()> {        println!("Program started");        // Block on the final future      block_on(load_files());        Ok(())  }    async fn load_files() {      // Join the two futures together      join!(load_file_1(), load_file_2());  }    async fn load_file_1() {      let r1 = file::read_file("src/file1.txt").await;      println!("file 1 size: {}", r1.unwrap().len());  }    async fn load_file_2() {      let r2 = file::read_file("src/file2.txt").await;      println!("file 2 size: {}", r2.unwrap().len());  }

为了验证异步性,将一堆数据转储到file1.txt中。

Program started  file 1 size: 5399  file 2 size: 5

不幸的是,这看起来(确实)第一个文件函数再次阻塞了。

那么异步到底是什么?

与多线程类似,异步编程中也有一些陷阱和问题。事实是,async关键字不会神奇地使代码异步;它只是使函数返回Future。仍然必须繁重地安排代码执行时间。

这意味着函数必须迅速返回尚未准备就绪的状态,而不是被困在进行计算的过程中。在我们的情况下,阻塞是特定在File::Openfile.read_to_string处发生的。这两个函数不是异步的,因此会阻止执行。

我们需要创建这两个函数的异步版本。幸运的是,一些使用async-std的人做了工作,将Rust中的std库重写为异步版本。

使用async-std的文件IO

我们唯一要做的更改是将我们的std导入替换为async_std

对于以下示例,我们使用crate async-std版本1.5.0。

// async-example/src/file.rs    // We use async_std instead of std, it's that simple.  use async_std::io;  use async_std::fs::File;  use async_std::prelude::*;    pub async fn read_file(path: &str) -> io::Result<String> {      let mut file = File::open(path).await?;      let mut buffer = String::new();      file.read_to_string(&mut buffer).await?;      Ok(buffer)  }

main.rs中的代码保持不变;该程序仍使用crate futures中的block_on执行程序。

编译并运行程序。(确保有一个大的file1.txt

Program started  file 2 size: 5  file 1 size: 5399

最后!程序首先快速处理file2.txt,然后移至file1.txt

让我们回顾一下到目前为止所学到的东西:

  • async使我们的函数返回Future
  • 运行我们的Future需要一个运行时。
  • 运行时检查Future是否准备就绪;并在就绪时返回其值。

总结

在这篇文章中,我们介绍了同步代码,多线程代码,Rust中的一些异步术语,async-std库和简单的Future实现。实际上,这是一个"轻量级"的介绍,为简洁起见,省略了许多细节。