RUA!

异步 Rust

TABLE OF CONTENTS

async 是 Rust 选择的异步模型。

并发在现代社会非常重要,多数主流语言都有对自己并发模型的设计以及取舍。

  • OS 线程, 它最简单,也无需改变任何编程模型(业务/代码逻辑),因此非常适合作为语言的原生并发模型,在 多线程章节也提到过,Rust 就选择了原生支持线程级的并发编程。但是,这种模型也有缺点,例如线程间的同步将变得更加困难,线程间的上下文切换损耗较大。使用线程池在一定程度上可以提升性能,但是对于 IO 密集的场景来说,线程池还是不够。
  • 事件驱动(Event driven), 这个名词你可能比较陌生,如果说事件驱动常常跟回调(Callback)一起使用,相信大家就恍然大悟了。这种模型性能相当的好,但最大的问题就是存在回调地狱的风险:非线性的控制流和结果处理导致了数据流向和错误传播变得难以掌控,还会导致代码可维护性和可读性的大幅降低,大名鼎鼎的 JavaScript 曾经就存在回调地狱。
  • 协程(Coroutines) 可能是目前最火的并发模型,Go 语言的协程设计就非常优秀,这也是 Go 语言能够迅速火遍全球的杀手锏之一。协程跟线程类似,无需改变编程模型,同时,它也跟 async 类似,可以支持大量的任务并发运行。但协程抽象层次过高,导致用户无法接触到底层的细节,这对于系统编程语言和自定义异步运行时是难以接受的
  • actor 模型是 erlang 的杀手锏之一,它将所有并发计算分割成一个一个单元,这些单元被称为 actor , 单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。由于 actor 模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用
  • async/await, 该模型性能高,还能支持底层编程,同时又像线程和协程那样无需过多的改变编程模型,但有得必有失,async 模型的问题就是内部实现机制过于复杂,对于用户来说,理解和使用起来也没有线程和协程简单,好在前者的复杂性开发者们已经帮我们封装好,而理解和使用起来不够简单,正是本章试图解决的问题。

Rust 选择的取舍是,同时支持系统 OS 线程和 async 异步:

  • 系统线程通过标准库实现,应对不是非常的并发,而需要并行计算时。优点是线程内的代码执行效率更高、实现更直观更简单。
  • 异步通过语言特性 + 标准库 + 三方库的方式实现。并发性能会更好,但也需要额外的运行时支持。

Rust 中异步的特点:

  • Future 在 Rust 中是惰性的,只有在被轮询(poll)时才会运行, 因此丢弃一个 future 会阻止它未来再被运行, 你可以将 Future 理解为一个在未来某个时间点被调度执行的任务。
  • Async 在 Rust 中使用开销是零, 意味着只有你能看到的代码(自己的代码)才有性能损耗,你看不到的(async 内部实现)都没有性能损耗,例如,你可以无需分配任何堆内存、也无需任何动态分发来使用 async ,这对于热点路径的性能有非常大的好处,正是得益于此,Rust 的异步编程性能才会这么高。
  • Rust 没有内置异步调用所必需的运行时,但是无需担心,Rust 社区生态中已经提供了非常优异的运行时实现,例如大明星 tokio
  • 运行时同时支持单线程和多线程

异步函数

一个普通函数会在调用时会立即执行,而一个异步函数的行为并非如此。

fn main() {
    normal_fool();  // print right now.
}

fn normal_fool() {
    println!("normal_fool");
}

异步函数的情况:

#![allow(dead_code, unused_variables)]

fn main() {
    normal_fool();

    let foo1 = foo1(); // -> impl Future<Output = ()>
}

fn normal_fool() {
    println!("normal_fool");
}

async fn foo1() {
    println!("foo1");
}

异步函数 foo1() 并不会理解执行,而是返回一个 Future。将一个函数标记为 async fn 时也表明改函数会被转换成一个返回一个实现了 Future 的值。

也就是说 foo1()foo2() 是相同的:

async fn foo1() {
    println!("foo1");
}

fn foo2() -> impl Future<Output = ()> {
    async {
        println!("foo1");
    }
}

与 JavaScript 相比

与老朋友 JavaScript 不同的时,Future 在 Rust 中是惰性的。在 JavaScript 中,创建一个异步的任务,无论是任务队列,还是 Promise,它都会立即执行。而且 JavaScript 自带异步运行时。

  • JavaScript 中的异步任务会立即被添加到任务队列中;
  • JavaScript 自带异步运行时;
  • JavaScript 中 async/await 语法只是 Promise 到语法糖;

在 JavaScript 中,一个异步任务会被立即添加到任务队列中的末尾。如果不做任何处理,则该异步任务会立即脱离当前环境。

const sleep = async (duration) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, duration);
  });
};

const main = async () => {
  console.log('hello');
  sleep(1000);
  console.log('world');
};

main();

此处的 sleep 函数则会在之前到当前行时立即被添加到任务队列的末尾。等到 main 函数其他同步代码都执行完成后,才会在等待 1000ms 并结束 main 函数。这是 JavaScript 最基本的异步运行方式。

Future

但 Rust 首当其冲不同的就是,Future 是惰性的。一个 async 函数会返回一个 Future,若直接调用该函数,不会发生任何事,因为 Future 还没有被执行。

Future 人如其名,可以理解为它是一个定义在未来被执行的任务,若不触发它则不会执行。使用一个 Future 需要使用一个执行器。

futures 包为我们提供了一个最简单粗暴的执行器:block_on。它会阻塞当前线程以等待任务完成。

use futures::executor::block_on;

fn main() {
    let future = sleep();

    block_on(future);
}

async fn sleep() {
    println!("Try to sleep.")
}

await

既然一个函数被标记为了 async 函数,那肯定少不了它的搭档 await。在一个 async 函数执行另一个 async 函数的方法就是使用 await。不过与 JavaScript 或 C# 等语言不同的是,在 Rust 中使用 await 看上去更像是一个方法的调用:

use futures::executor::block_on;

fn main() {
    let future = sleep();

    block_on(future);
}

async fn sleep() {
    hello_cat().await;
    println!("Try to sleep.")
}

async fn hello_cat() {
    println!("Hello kitty.")
}

如果移除 await 字段,则编译器会给我们 futures do nothing unless you .await or poll them 的提示。出了 await 的方式运行 async 函数,还有个就是使用轮询。

block_on 不同的时,await 不会阻塞当前的线程。而是异步的等待 Future 的完成,在等待的过程中,该线程还可以继续执行其他的任务,从而实现了并发处理的效果。

#![feature(future_join)]
use async_std::task::sleep;
use futures::executor::block_on;
use std::future::join;
use std::time::Duration;

fn main() {
    let f1 = try_sleep();
    let f2 = hello_dog();

    block_on(join!(f1, f2));
}

async fn try_sleep() {
    hello_cat().await;
    sleep(Duration::from_secs(3)).await;
    println!("Try to sleep.")
}

async fn hello_cat() {
    println!("Hello kitty.")
}

async fn hello_dog() {
    println!("Hello puppy");
}

join! 宏可以并发的处理和等待多个 Future,当 try_sleep 被阻塞时,则 hello_dog 可以拿过线程的所有权继续执行。通过这种方式实现了使用同步的代码顺序实现了异步的执行效果

Future 特征

与 JavaScript 将异步任务放到任务队列尾端不同的时,Rust 的 Future 特征恰恰是异步函数的返回值和被执行的关键。

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

#[derive(Debug)]
enum Poll<T> {
    Ready(T),
    Pending,
}

这里模拟真正的 Future 实现一个简易版的 SimpleFuture。一个 Future 需要被执行器 poll 轮询后才能执行,通过调用该方法,可以推进 Future 的进一步执行,直到被阻塞将运行权转移给其他 Future。

若在当前 poll 中,Future 可以被完成,则会返回 Poll::Ready(result),反之则返回 Poll::Pending。并安排一个 wake 函数,当未来 Future 准备好被切回来继续执行时,该函数就会被调用。然后继续刚开始的步骤,该 Future 的执行器会再次调用 poll 方法,此时 Future 就可以继续执行了。

再来模拟一个简单的 Socket,并从一个 socket 中异步读取数据。当该 socket 中有数据时,就直接读取数据并返回 Poll::Ready(data);如果数据还没准备好,Future 会被阻塞且不再继续执行,并注册一个 wake 函数,当 socket 数据准备好的时候,该函数将被调用以通知执行器继续执行。

use std::{thread::sleep, time::Duration};

pub struct Socket {
    pub test: u8,
}

impl Socket {
    pub fn has_data_to_read(&self) -> bool {
        return false;
    }

    pub fn read_buf(&self) -> Vec<u8> {
        return vec![];
    }

    pub fn set_readable_callback(&self, wake: fn()) {
        sleep(Duration::from_secs(3));
        wake();
    }
}

Future 本身是一个 trait,除了 async 函数会自动实现这个 trait,我们还可以手动为结构体去实现它。其中 poll 方法就是执行异步函数的主要部分,它配合惰性的特性,在执行器的驱动下实现了异步的主要功能:待会再执行。

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // socket 有数据,写入 buffer 并返回
            Poll::Ready(self.socket.read_buf())
        } else {
            // socket 中还没数据
            //
            // 注册一个 `wake` 函数,当数据可用时,该函数会被调用,
            // 然后当前 Future 的执行器会再次调用 `poll` 方法,此时就可以读取到数据。
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

构建自己的异步运行时

Future 的实现略微复杂,不是一个 SimpleFuture 就能够解决的。要想完全了解 Rust 异步在背后是如何运行的,就需要手动来构建一个简化版的 Future。

使用 Waker 来唤醒任务

Future 的 poll 方法决定了异步的特性,对于 Future 来说,第一次执行 poll 方法无法执行完成是正常的。当其阻塞时,需要将执行的优先权让给其他 Future。并且需要确保在未来一旦准备好时,可以继续执行 poll 方法。而通知可以继续执行 poll 方法就是通过 Waker 来完成的。

Waker 提供了一个 wake 方法用于告诉执行器(后续创建),相关的任务可以继续执行了,此时执行器就可以对对应的 Future 再次执行 poll 方法。

构建定时器

构建定时器之前我们需要一个 Future,因此需要使用到标准库中的 future

但实现一个真正的异步运行时非常复杂的,不仅仅是 Future 本身,还有其他 API 的异步操作。所以这里使用标准库中的 Future 来创建一个简单的定时 Future:TimeFuture。它的特点就是在第一次 poll 时会创建一个定时器来模拟需要被等待的任务,当定时器结束后,使用 Waker 通知 Executor 来再次执行 poll

非常的简单易懂,通过这个简单的 Future 就能够大致的了解了 Future 是如何执行与调度任务了。

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// 在 Future 和等待的线程间共享状态
struct SharedState {
    /// 是否已经开始过第一次 `poll`
    started: bool,

    /// 定时(睡眠)是否结束
    completed: bool,

    /// 定时时长
    duration: Duration,

    /// 当睡眠结束后,线程可以用 `waker` 通知 `TimerFuture` 来唤醒任务
    waker: Option<Waker>,
}

我们的 Future 非常简单,结构体中只保存了一个用于共享的状态。其中状态中包含一个 waker,当线程睡眠结束后可以使用它来通知我们的 TimeFuture 来唤醒任务再次执行。

为什么需要 started 状态? 这里是为了模拟真正的 Future 的惰性,将实际的工作代码放在 poll 方法内,只有在第一次被 poll 时(也就是 .await 时)才会真正的执行。 后续则根据 completed 来决定返回的 Poll 状态。

当然到这里 TimeFuture 还不是一个真正的 Future,它还需要实现 Future 这个 trait。这里的 poll 方法非常简单,根据共享状态,如果该方法是第一次执行,则创建一个新的线程来执行真正的任务。然后继续向下执行,判断状态中 completed 是否完成,如果未完成则返回 Poll::Pending 表示该异步任务放到后面还要执行。如果完成则返回 Poll::Ready(()) 表示该任务已经执行完成。

而刚刚创建用来执行真正任务的线程,则会休眠指定的时间。当休眠结束后,其线程会继续执行它的代码,也就是标记 completed 为完成,同时调用共享状态中的 waker.wake() 方法来通知执行器可以继续 poll 了。

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 通过检查共享状态,来确定定时器是否已经完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if !shared_state.started {
            let thread_shared_state = self.shared_state.clone();
            thread::spawn(move || {
                let mut shared_state = thread_shared_state.lock().unwrap();
                println!("start first execute");
                shared_state.started = true;
                thread::sleep(shared_state.duration);
                // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
                shared_state.completed = true;
                if let Some(waker) = shared_state.waker.take() {
                    waker.wake()
                }
            });
        }
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置 `wkaer`,这样新线程在睡眠结束后可以唤醒当前的任务,接着再次对 `Furture` 进行
            // `poll` 操作
            //
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

而该结构体的创建则非常简单,创建一个可线程间共享的状态即可。重点的 waker 现在还是 None,是因为它会在第一次 poll 时由 Executor 来创建并当作 context 来传递给 poll 方法。也就是 cx: &mut Context<'_>。至于为什么签名是 Context 会在后面的构建 Executor 中看到。

impl TimerFuture {
    /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            started: false,
            completed: false,
            duration,
            waker: None,
        }));

        TimerFuture { shared_state }
    }
}

执行器 Executor

Rust 的 Future 是惰性的,默认情况下不会去主动的执行。在 async 函数中使用 .await 能调用另一个 async 函数。.await 总是需要 async 的存在,那最外层的 Future 该怎么才能执行呢?没错,就是执行器 Executor。

执行器会管理一批 Future (最外层的 async 函数),然后通过不停地 poll 推动它们直到完成。最开始它会主动调用一次 poll 方法,并传递响应的 Context,后续就不会再主动的执行 poll 方法了,而是等待 Future 的 wake 函数通知它继续执行。这种 wake 通知然后 poll 的方式会不断重复,直到 Future 完成。

构建执行器

终于到了构建执行器这一步。

Rust 的 Future 是惰性的,只有主动进行 poll 之后才会开始执行。其中一个主动 poll 的方式就是在 async 函数中使用 await 来调用另一个 async 函数。但这只能解决在 async 函数中的 Future。再最外层的 async 函数终究需要一个主动运行的办法。这就是执行器 executor 的重要工作。

执行器负责管理最外层的 async 函数,通过不停的 poll 来执行所有的异步函数。不过执行器只会主动 poll 一次 Future,后续则不会在进行主动 poll 操作。而是等待 Future 调用 wake 函数来通知它可以继续执行,它才会继续 poll。这种 wake 然后 poll 的方式会不断重复,直到 Future 完成。

构建一个属于我们自己的迷你执行器,需要用到 futures 包中的 ArcWake 特征,它可以帮助我们去构建 Waker。

cargo add futures

首先我们需要创建一个执行器的结构体,结构体中有一个 ready_queue 字段,用于存放 sync_channel 发送过来的 Task。Task 就是执行任务的关键。

/// 任务执行器,负责从通道中接收任务然后执行
pub struct Executor {
    pub ready_queue: Receiver<Arc<Task>>,
}

随后就需要一个创建异步任务的 Spawner 和执行器的 ready_queue 字段类似,它的 task_sender 就是用于发送 Task 给执行器的。

#[derive(Clone)]
pub struct Spawner {
    pub task_sender: SyncSender<Arc<Task>>,
}

接下来则是最重要的部分 Task,根据上述 ExecutorSpawner 的字段签名就能看出来,他们发送和接受的分别就是 Task 结构体。Task 结构体中有两个重要的部分,分别是 future 本身和与 Spawner 一样的 task_sender 用于将 Future 放回通道中,等待执行器 poll。

/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
pub struct Task {
    /// 进行中的Future,在未来的某个时间点会被完成
    ///
    /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
    /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
    ///
    /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
    pub future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 可以将该任务自身放回到任务通道中,等待执行器的poll
    pub task_sender: SyncSender<Arc<Task>>,
}

其中 pub future: Mutex<Option<BoxFuture<'static, ()>>> 的这个签名才是重要所在。首先第一个 Mutex 的作用主要是告诉编译器我们的 future 是线程安全的,虽然我们只有一个线程来执行任务,但编译器无从得知。

其次就是 BoxFuture<'a, T>,这是个类型别名:

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

由于 Future 的特性,需要使用 Pin<T> 来保证其在内存中固定的位置,不会移动。后续的特征对象表示我们的 Future 可以在线程中安全的移动。

到此我们的 Task 才完成了一半,还需要将其实现 ArcWake 特征才能在执行器中将其变成 wake,用于后续通知执行器继续 poll

不过在此之前,我们先来实现一下 Spawnerspawn 方法,它负责创建新的 Future,并将其放到任务通道中。

/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
impl Spawner {
    pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = Box::pin(future);
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("任务队列已满");
    }
}

在创建新的 Task 时,我们将 Spawner 自身的 task_sender 保存给了 Task,其作用就是用于将 Task 自身发送到任务通道中。在 spawn 执行时我们将直接 Task 发送到执行器。

随后就是给 Task 实现 ArcWake 了,我们只需要实现一个方法 wake_by_ref,它就是 wake 的核心所在。当调用 wake 时,它将 clone 自身的 Arc 并发送到任务通道。

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("任务队列已满");
    }
}

SpawnerTask 主要是将 Future 发送到任务通道中,而我们最后一个实现的方法就是接受并执行通过任务通道发送过来的任务。在上述 Spawner 第一次 spawn 的时候,就会立即发送到任务通道。这就是 Future 的第一次执行,也就是执行器的主动 poll,后续则根据传递过去的 wake 方法来通知(发送 Task 到任务通道)执行器继续 poll

impl Executor {
    pub fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 基于任务自身创建一个 `LocalWaker`
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
                // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                if future.as_mut().poll(context).is_pending() {
                    // Future还没执行完,因此将它放回任务中,等待下次被poll
                    *future_slot = Some(future)
                }
            }
        }
    }
}

所以我们为 Executor 实现的 run 方法就是循环任务通道,当接受到任务时,首先将取出 Option 中的 Task。并基于任务自身创建一个 waker,这就是上述为 Task 实现 ArcWake 的作用:

let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);

通过这两个方法,就能将任务自身创建成一个在 Future 中传递的上下文,也就是 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) 中的 cx,用于通知执行器可以再次 poll 了。

if let Some(waker) = shared_state.waker.take() {
    waker.wake()
}

最后一句即是执行 Future 的 poll 方法,并传递 waker。同时根据其返回值判断是否时 pending 状态。如果是,则将 Future 在放回起初的 Option 中,等待其调用 wake 方法。

if future.as_mut().poll(context).is_pending() {
    // Future还没执行完,因此将它放回任务中,等待下次被poll
    *future_slot = Some(future)
}

到此,属于我们自己的迷你 Executor 就创建完成了。可以通过 Spawner 创建一个真正的 Future 测试了:

pub fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 任务通道允许的最大缓冲数(任务队列的最大长度)
    // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
    const MAX_QUEUED_TAKSS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TAKSS);
    (Executor { ready_queue }, Spawner { task_sender })
}
#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::{executor::new_executor_and_spawner, waker::TimerFuture};

    #[test]
    fn it_works() {
        let (exec, spawner) = new_executor_and_spawner();

        spawner.spawn(async {
            println!("howdy!");
            TimerFuture::new(Duration::new(2, 0)).await;
            println!("done!")
        });

        drop(spawner);

        exec.run();
    }
}

要在 test 中输出到 std,则需要使用:

cargo test -- --nocapture