WASI 0.2 异步运行时的设计思路

  • 2024年 3月1日
  • 读完约需 25 分钟
  • • 
  • 标签: 
  • rust
  • async
  • 最后更新于 2024年 6月6日

原文链接

在 2019 年,我和 Stjepan Glavina 一起开发了 async-std 运行时。它是 runtime 项目的一个分支,其本身旨在简化在不同异步运行时之间进行抽象的过程。在 async-std 的开发工作中,我最自豪的是实现了核心 IO 抽象,后来 Stjepan 将其引入 smol 项目,分解为 pollingasync-io 库。他将它们发展成了可以独立工作的强大构建模块,超越了我最初的原型代码。

无论如何,回忆这段往事有我的目的:我刚刚完成了另一个异步运行时的构建,不是要大家去使用这个运行时,而是希望它成为一个可行的、最小的、正确的 WASI 0.2 异步运行时实现。本文详细介绍我是如何构建它的,以便您在需要时可以构建自己的运行时( 如果您有这个打算的话 )。我是第一个编写这些代码的人之一,甚至可能是第一个编写专用运行时的人。这意味着,如果 SmolMonoioGlommioTokio 想要添加对 WASI 0.2 的支持,也必须实现我已经实现的内容。因此,我想我可以帮助大家省去一些麻烦,同时记录一下我刚刚完成的工作。

WASI 0.2 轮询模型

WASI 0.2 是基于 就绪状态 而非完成状态的。这意味着我们需要等待主机系统告诉我们可以采取某种行动,而不是告诉我们某个操作已成功完成(基于完成状态)。WASI 0.3 可能会切换到基于完成状态的系统,因为 Linux 的 io_uringWindows 的 ioringapi 都是基于完成状态的,并且性能表现非常出色,但我们还没有这样的实现。

WASI 的核心轮询系统由两个组件组成:可轮询类型( Pollable )轮询方法( poll )。可轮询类型表示对某事件的关注。如果我们有某种读取调用,就会有一个与该调用相关联的可轮询类型,我们可以将它提交给主机系统,表示:“请在我可以调用该读取操作时通知我”,这就是系统基于“就绪状态”的含义 —— 将对某操作的关注提交给主机系统,后者会定期产生一个列表,列出哪些操作可以调用,主机系统通过轮询函数来调度这种关注。

WASI 0.2 的轮询模型和 epoll 之间的关键区别在于:在 WASI 中,我们调度的不是对资源(例如文件描述符)的关注,而是对具体操作的关注。在 epoll 下,我们会告诉内核类似于:“嘿,这里有一个代表某个套接字的文件描述符 - 任何时候只要它可以读取或写入新数据我都感兴趣。” 在 WASI 中,我们更加精确;我们生成一个特殊的方法,它返回一个类型,该类型不仅有获取底层数据的能力,还有订阅方法可以返回一个可轮询类型。然后等轮询方法告诉我们可轮询对象已准备就绪时,我们可以调用该方法来获取底层数据,而不会返回错误。说了这么多,我们来看一个基本示例:

use wasi::http::outgoing_handler::{handle, OutgoingRequest};
use wasi::http::types::{Fields, Method, Scheme};
use wasi::io::poll;

fn main() {
    // 构造 HTTP 请求
    let fields = Fields::new();
    let req = OutgoingRequest::new(fields);
    req.set_method(&Method::Get).unwrap();
    req.set_scheme(Some(&Scheme::Https)).unwrap();
    req.set_path_with_query(Some("/")).unwrap();
    req.set_authority(Some("example.com")).unwrap();

    // 发送请求并等待它完成
    let res = handle(req, None).unwrap();  // 1. 我们准备好通过网络发送请求了
    let pollable = res.subscribe();        // 2. 从响应的 future 中获取可轮询对象
    poll::poll(&[&pollable]);              // 3. 阻塞直到我们已准备好查看响应

    // 我们已准备完毕可以读取响应头了,如果之前的请求不成功,仍然可能得到
    // 反馈的错误信息,但是不会得到“试图在操作执行完之前就读取数据”的错误
    let res = res.get().unwrap().unwrap().unwrap();
    for (name, _) in res.headers().entries() {
        println!("header: {name}");
    }
}

当执行 cargo-component ( 传入 -- -S http 标记 )时,结果如下:

header: age
header: cache-control
header: content-type
header: date
header: etag
header: expires
header: last-modified
header: server
header: vary
header: x-cache
header: content-length

轮询器设计

HTTP 请求示例中,我们展示了如何发起一个 HTTP 请求并将该操作注册到轮询方法中。异步计算的一个主要优势是能够执行临时并发:我们不仅仅希望能够等待某一个操作完成,还希望能够同时等待任意数量的操作完成。这就是为什么轮询方法接受一个可轮询类型的列表作为参数,并且返回的也是一个已准备好继续执行操作的列表。

为了实现这一点,我们需要创建一些类型来保存可轮询对象,它要高效,并允许我们将“就绪”事件列表与某种标识关联起来。这很重要,因为我们正在构建 Rust 异步运行时,需要将“就绪事件”与“唤醒特定的唤醒器”关联起来。我们可以从定义一个包含 Slab 的结构体开始。slab 是一种高效的键值数据结构,它在插入值时会返回一个访问值的键,并且是无序且分配可重用的 —— 这非常好,因为我们将在不同的可轮询类型中不断注册和注销”关注“。

pub(crate) struct Poller {
    pub(crate) targets: Slab<Pollable>,
}

插入一个类型到 Slab 时,它返回一个 usize 类型的键。为了简便,也为了能更容易桥接到 WASI,我们将定义 EventKey 类型来包装这个键,

// 一个键值代表一个 poller 中的对象
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) u32);

现在,我们已准备好在轮询器上实现基本 CRUD 方法:new、insert、remove 和 get。除了对 EventKey 的转换之外,这里没有什么有趣的事情发生。

impl Poller {
    /// 创建一个新的 `Poller` 实例
    pub(crate) fn new() -> Self {
        Self {
            targets: Slab::new()
        }
    }

    /// 在 `Poller` 中插入一个可轮询对象
    pub(crate) fn insert(&mut self, target: Pollable) -> EventKey {
        let key = self.targets.insert(target);
        EventKey(key as u32)
    }

    /// 返回 key 对应的可轮询对象,如果没找到则返回None
    pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> {
        self.targets.get(key.0 as usize)
    }

    /// 从 `Poller` 中移除 key 对应的可轮询对象
    ///
    /// 如果没找到则返回None
    pub(crate) fn remove(&mut self, key: EventKey) -> Option<Pollable> {
        self.targets.try_remove(key.0 as usize)
    }
}

最后,我们实现最有趣的部分:在 Poller 中等待事件的方法,并将其映射到它们各自的 EventKey 上。当我们向 poll 调用提交 Pollable 列表时,会得到一个索引列表,它指向我们提交的 pollable 列表的索引。这些值与我们拥有的EventKey不同,因此我们必须构建一个查找表来将 Pollable 列表的索引映射回事件键。

impl Poller {
    /// 阻塞当前线程直到有新的事件发生
    ///
    /// `ready_list` 将会被清空
    pub(crate) fn block_until(&mut self) -> Vec<EventKey> {
        // 我们等待多个可轮询对象,当它们被唤醒时,我们会得到一些可轮询
        // 对象的索引,此时这些对象的事件已经是可用状态了,接下来就是需
        // 要将索引和正确的可轮询对象关联起来。

        // 我们的做法是:遍历所有可轮询对象,并记录下它属于哪个索引
        let mut indexes = Vec::with_capacity(self.targets.len());
        let mut targets = Vec::with_capacity(self.targets.len());
        for (index, target) in self.targets.iter() {
            indexes.push(index);
            targets.push(target);
        }

        // 关联关系有了,我们开始轮询。这将会阻塞,直到有事件发生
        let ready_indexes = poll(&targets);

        // 当获得了就绪的可轮询对象的索引时,需要将它转换成唤醒器的键值
        // 之前我们已经建立了索引到唤醒器键值的映射,所以可以直接进行查找
        ready_indexes
            .into_iter()
            .map(|index| EventKey(indexes[index as usize] as u32))
            .collect()
    }
}

我们的 Pollable 抽象就算完成了!

反应器设计

现在,我们可以跟踪可轮询列表,也能订阅它们的方法了,接下来就是把唤醒器(wakers)纳入其中,并创建一个类型供 Future 用于注册对事件的关注。WASI 异步运行时本质上是单线程的,因此我们将在这里使用 Rc 和 RefCell 类型。现在可以构建我们资金的数据类型了,还是从核心结构开始:

use super::polling::{EventKey, Poller};

use std::collections::HashMap;
use std::task::Poll;
use std::task::Waker;
use std::{cell::RefCell, rc::Rc};
use wasi::io::poll::Pollable;

/// 管理 WASI 0.1 的异步系统资源
#[derive(Debug, Clone)]
pub struct Reactor {
    inner: Rc<RefCell<InnerReactor>>,
}

/// 私有的、内部的反应器实现 - 独立出来以便于锁定
#[derive(Debug)]
struct InnerReactor {
    poller: Poller,
    wakers: HashMap<EventKey, Waker>,
}

impl Reactor {
    /// 创建反应器实例
    pub(crate) fn new() -> Self {
        Self {
            inner: Rc::new(RefCell::new(InnerReactor {
                poller: Poller::new(),
                wakers: HashMap::new(),
            })),
        }
    }
}

我们可以使用 Rc<RefCell> 的原因,是我们控制了方法所有的访问。我们保证它们总是短暂的,而且永远不会同时发生两次或以上。这反过来为我们提供了一种方便的方式,让我们能够在 &self 而不是 &mut self 上调用方法。说到这里,是时候实现第一个方法了 —— 在可轮询的对象上注册关注的方法。

在 Pollable 中注册关注

这为我们提供了一个异步方法 wait_for,允许我们的反应器异步等待,直到与 Pollable 相关的调用准备好执行。

impl Reactor {
    /// Wait for the pollable to resolve.
    pub async fn wait_for(&self, pollable: Pollable) {
        let mut pollable = Some(pollable);
        let mut key = None;

        // 这个方法是核心的循环部分,在 future 被轮询时会执行多次
        future::poll_fn(|cx| {
            // 首先是对反应器加锁,这是单线程并且生命周期非常短暂,所以不存在竞态
            // (具体就是不会存在多个线程竞争可变引用)。
            let mut reactor = self.inner.borrow_mut();

            // pollable 参数中的可轮询对象,会在第一次轮询时插入到反应器
            // 的 polller 中。唤醒器则每次轮询都插入到反应器的 wakers里
            // 面, 根据 HashMap 的特性,已插入的waker,每次都会被新的替换
            let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
            reactor.wakers.insert(*key, cx.waker().clone());

            // 检测 key 对应的可轮询对象是否已就绪,如果已就绪,则清除它并返回 Poll:Ready 
            if reactor.poller.get(key).unwrap().ready() {
                reactor.poller.remove(*key);
                reactor.wakers.remove(key);
                Poll::Ready(())
            } else {
                Poll::Pending
            }
        })
        .await
    }
}

比其他运行时优秀的地方在于,我们的关注被注册到对象,而这些关注的目标仅仅是某个操作。这意味着“等待此操作准备好”仅需调用 reactor.wait_for(pollable).await。与其他轮询模型中需要 syscall 操作相比有很大的不同,而且意味着可以将反应器所需的所有不变量,直接编码为它的一部分。

在 Rust WG Async 的最近对话中,我们一直在讨论将某种类型的反应器钩子作为 future 中 Context 的一部分是否合适,是否有其他更好的机制。从这个例子中,我认为可以得出以下结论:虽然可以通过 futureContext 共享反应器 —— 但在高层次上,它总是以相同的方式映射到底层函数调用,这至少让我对这是否是最自然的映射产生了怀疑。

阻塞直到事件就绪

处理完这些,我们就可以实现 Poller::block_until 调用的包装器了。根据我们的实现,它将等待所有已注册的轮询者(pollers),并为那些准备好被唤醒的轮询者返回一个 EventKeys 列表。我们要做的就是遍历这个列表,调用每个相关的唤醒程序:

// 阻塞,直到有新的事件就绪。一旦新事件就绪,就调用它的 waker
pub(crate) fn block_until(&self) {
    let mut reactor = self.inner.borrow_mut();
    for key in reactor.poller.block_until() {
        match reactor.wakers.get(&key) {
            Some(waker) => waker.wake_by_ref(),
            None => panic!("tried to wake the waker for non-existent `{key:?}`"),
        }
    }
}

乍一看,通过调用唤醒器来做到这些显得有点笨,因为 WASI 0.2 目前是单线程的,默认情况下它总是只唤醒一个唤醒器,所以使用唤醒器来区分事件是常见且被推荐的做法。并发原语可以构建自己的唤醒器以便跟踪标识实现更精确地唤醒。我们没有控制库构建的唤醒器,原因是我们要唤醒所有的唤醒器,即使默认情况下它们什么都不做。

阻塞(block_on)逻辑设计

现在我们已经完成了反应器,我们需要驱动它。为此,我们将定义一个名为 block_on 的函数,该函数接受一个返回 future 的闭包,并赋予它访问反应器的权限。只要闭包返回的 Future 处于活动状态,任务就会不断取得进展 —— 每次 Future 返回 Pending 时,等待反应器。

在这里,我们将手动驱动 Future,所以首先要定义我们自己的 Waker 实例。我们不支持任何形式的 spawn 函数,而是依赖用户利用库实现结构化的并发,因此可以直接将其设置为空操作(noop)。

/// 构造一个空操作的唤醒器
// 注意: 一旦 <https://github.com/rust-lang/rust/issues/98286> 稳定,我们就可以移除它了
fn noop_waker() -> Waker {
    const VTABLE: RawWakerVTable = RawWakerVTable::new(
        // `clone` 返回一个原始的唤醒器
        |_| RAW,
        // `wake` 什么都不做
        |_| {},
        // `wake_by_ref` 什么都不做
        |_| {},
        // `drop` 我们没有动态分配空间,什么都不需要做
        |_| {},
    );
    const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE);

    // SAFETY: 所有字段都是空操作,所以是安全的
    unsafe { Waker::from_raw(RAW) }
}

接下来是实际的 block_on 实现。核心逻辑基本上是一个循环 { match {} } 语句,它只是在每次迭代时调用 reactor.block_until,直到 Future 完成。


/// 开始驱动事件循环
pub fn block_on<F, Fut>(f: F) -> Fut::Output
where
    F: FnOnce(Reactor) -> Fut,
    Fut: Future,
{
    // 创建反应器实例
    let reactor = Reactor::new();

    // 创建并钉住 future 实例,钉住的目的是让它能被轮询
    let fut = (f)(reactor.clone());
    let mut fut = pin!(fut);

    // 创建上下文实例,稍后会传入 future
    let waker = noop_waker();
    let mut cx = Context::from_waker(&waker);

    // 轮询,要么 future 完成然后返回结果,要么继续等待
    loop {
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(res) => return res,
            Poll::Pending => reactor.block_until(),
        }
    }
}

至此,我们的异步运行时就完成了!

异步运行时案例

2024年3月7日添加的这部分

有了运行时,我们就将 wasi 库的调用封装成异步的了。比如,我们有一个非阻塞的、需要等待一定时长的 sleep 功能,可以使用 subscribe_duration 功能来写,并将它连接到反应器。我们要做的仅仅是封装它,并在 pollable 上等待它完成。使用之前我们的成果会让这变得足够简单:

use std::time::Duration;
use wasi::time::monotonic_clock::subscribe_duration;

pub async fn sleep(duration: Duration, reactor: &Reactor) {
    let duration = duration.as_nanos() as u64;     // 1. 将时长换算成纳秒
    let pollable = subscribe_duration(duration);   // 2. 获取可轮询对象
    reactor.wait_for(pollable).await;              // 3. 等待它就绪
}

从 wasi crate 中取出 API,并将它们与对 reactor.wait_for(…).await 的调用配对,这应该足够简单了。这就是我们异步化整个 wasi API 的基本模式。

本地执行器逻辑

WASI 0.2 是单线程的,本质上不需要访问 task::spawn 抽象。在同步 Rust 中,我们使用线程来结合并行性和并发性;但在异步 Rust 中,我们可以分离它们。futures-concurrency 库提供了您需要的访问任何并发模式的能力;这意味着在没有并行性的情况下不需要执行器。

相反,API(如Vec::joinStreamGroup)甚至提供了访问无界并发原语的方法,人们通常在“本地执行器”中使用 —— 但没有任何抽象的作用域生命周期问题。以下是一个示例,展示了如何在不依赖“本地任务”的情况下并发地发出两个单独的HTTP请求:

   block_on(|reactor| async {
        let client = Client::new(reactor); // <- 使用了还未发布的 `wasi::http` 的封装

        let a = async {
            let url = "https://example.com".parse().unwrap();
            let req = Request::new(Method::Get, url);
            let res = client.send(req).await;

            let body = read_to_end(res).await;
            let body = String::from_utf8(body).unwrap();
            println!("{body}");
        };

        let b = async {
            let url = "https://example.com".parse().unwrap();
            let req = Request::new(Method::Get, url);
            let res = client.send(req).await;

            let body = read_to_end(res).await;
            let body = String::from_utf8(body).unwrap();
            println!("{body}");
        };

        (a, b).join().await; // 并行 `a` 或 `b`.
    })
}

结语

在这篇文章中,我解释了 WASI 的轮询模型,并逐步展示了如何使用它来构建自己的异步运行时。我希望这对异步 Rust 运行时的维护者以及想要自己编写异步运行时的业余爱好者有用。如果你只想使用我今天在这里分享的代码,你可以通过安装 wasi-async-runtime crate 来实现。

参考:

  1. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#the-wasi-0-2-polling-model
  2. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-poller-abstraction
  3. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-reactor-abstraction
  4. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#registering-interest-in-a-pollable
  5. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#blocking-until-events-are-ready
  6. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-block-on-abstraction
  7. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#on-the-absence-of-a-local-executor
  8. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#conclusion
  9. https://docs.rs/async-std
  10. https://docs.rs/runtime
  11. https://docs.rs/polling
  12. https://github.com/smol-rs/async-io
  13. https://docs.rs/smol
  14. https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/
  15. https://bytecodealliance.org/articles/WASI-0.2
  16. https://docs.rs/smol/latest/smol/
  17. https://docs.rs/monoio/latest/monoio/
  18. https://docs.rs/glommio/latest/glommio/
  19. https://docs.rs/tokio/latest/tokio/
  20. https://en.wikipedia.org/wiki/Io_uring
  21. https://learn.microsoft.com/en-us/windows/win32/api/ioringapi/
  22. https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html
  23. https://docs.rs/wasi/latest/wasi/io/poll/fn.poll.html
  24. https://github.com/bytecodealliance/cargo-component
  25. https://blog.yoshuawuyts.com/why-async-rust/#ad-hoc-concurrency
  26. https://doc.rust-lang.org/std/task/struct.Waker.html
  27. https://docs.rs/slab/latest/slab/
  28. https://docs.rs/futures-concurrency/latest/futures_concurrency/
  29. https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Join.html#impl-Join-for-Vec%3CFut%3E
  30. https://docs.rs/futures-concurrency/latest/futures_concurrency/stream/struct.StreamGroup.html
  31. https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/