WASI 0.2 异步运行时的设计思路

  • 2024年 3月1日
  • 读完约需 25 分钟
  • • 
  • 标签: 
  • rust
  • async
  • 最后更新于 2024年 6月6日

原文链接

在 2019 年,我和 Stjepan Glavina 一起开发了 async-std 运行时。它是 runtime 项目的一个分支,其本身旨在简化在不同异步运行时之间进行抽象的过程。在 async-std 的开发工作中,我最自豪的是实现了核心 IO 抽象,后来 Stjepan 将其引入 smol 项目,分解为 pollingasync-io 库。他将它们发展成了可以独立工作的强大构建模块,超越了我最初的原型代码。

无论如何,回忆这段往事有我的目的:我刚刚完成了另一个异步运行时的构建,不是要大家去使用这个运行时,而是希望它成为一个可行的、最小的、正确的 WASI 0.2 异步运行时实现。本文详细介绍我是如何构建它的,以便您在需要时可以构建自己的运行时( 如果您有这个打算的话 )。我是第一个编写这些代码的人之一,甚至可能是第一个编写专用运行时的人。这意味着,如果 SmolMonoioGlommioTokio 想要添加对 WASI 0.2 的支持,也必须实现我已经实现的内容。因此,我想我可以帮助大家省去一些麻烦,同时记录一下我刚刚完成的工作。

WASI 0.2 轮询模型

WASI 0.2 是基于 就绪状态 而非完成状态的。这意味着我们需要等待主机系统告诉我们可以采取某种行动,而不是告诉我们某个操作已成功完成(基于完成状态)。WASI 0.3 可能会切换到基于完成状态的系统,因为 Linux 的 io_uringWindows 的 ioringapi 都是基于完成状态的,并且性能表现非常出色,但我们还没有这样的实现。

WASI 的核心轮询系统由两个组件组成:可轮询类型( Pollable )轮询方法( poll )。可轮询类型表示对某事件的关注。如果我们有某种读取调用,就会有一个与该调用相关联的可轮询类型,我们可以将它提交给主机系统,表示:“请在我可以调用该读取操作时通知我”,这就是系统基于“就绪状态”的含义 —— 将对某操作的关注提交给主机系统,后者会定期产生一个列表,列出哪些操作可以调用,主机系统通过轮询函数来调度这种关注。

WASI 0.2 的轮询模型和 epoll 之间的关键区别在于:在 WASI 中,我们调度的不是对资源(例如文件描述符)的关注,而是对具体操作的关注。在 epoll 下,我们会告诉内核类似于:“嘿,这里有一个代表某个套接字的文件描述符 - 任何时候只要它可以读取或写入新数据我都感兴趣。” 在 WASI 中,我们更加精确;我们生成一个特殊的方法,它返回一个类型,该类型不仅有获取底层数据的能力,还有订阅方法可以返回一个可轮询类型。然后等轮询方法告诉我们可轮询对象已准备就绪时,我们可以调用该方法来获取底层数据,而不会返回错误。说了这么多,我们来看一个基本示例:

use wasi::http::outgoing_handler::{handle, OutgoingRequest}; use wasi::http::types::{Fields, Method, Scheme}; use wasi::io::poll; fn main() { // 构造 HTTP 请求 let fields = Fields::new(); let req = OutgoingRequest::new(fields); req.set_method(&Method::Get).unwrap(); req.set_scheme(Some(&Scheme::Https)).unwrap(); req.set_path_with_query(Some("/")).unwrap(); req.set_authority(Some("example.com")).unwrap(); // 发送请求并等待它完成 let res = handle(req, None).unwrap(); // 1. 我们准备好通过网络发送请求了 let pollable = res.subscribe(); // 2. 从响应的 future 中获取可轮询对象 poll::poll(&[&pollable]); // 3. 阻塞直到我们已准备好查看响应 // 我们已准备完毕可以读取响应头了,如果之前的请求不成功,仍然可能得到 // 反馈的错误信息,但是不会得到“试图在操作执行完之前就读取数据”的错误 let res = res.get().unwrap().unwrap().unwrap(); for (name, _) in res.headers().entries() { println!("header: {name}"); } }

当执行 cargo-component ( 传入 -- -S http 标记 )时,结果如下:

header: age header: cache-control header: content-type header: date header: etag header: expires header: last-modified header: server header: vary header: x-cache header: content-length

轮询器设计

HTTP 请求示例中,我们展示了如何发起一个 HTTP 请求并将该操作注册到轮询方法中。异步计算的一个主要优势是能够执行临时并发:我们不仅仅希望能够等待某一个操作完成,还希望能够同时等待任意数量的操作完成。这就是为什么轮询方法接受一个可轮询类型的列表作为参数,并且返回的也是一个已准备好继续执行操作的列表。

为了实现这一点,我们需要创建一些类型来保存可轮询对象,它要高效,并允许我们将“就绪”事件列表与某种标识关联起来。这很重要,因为我们正在构建 Rust 异步运行时,需要将“就绪事件”与“唤醒特定的唤醒器”关联起来。我们可以从定义一个包含 Slab 的结构体开始。slab 是一种高效的键值数据结构,它在插入值时会返回一个访问值的键,并且是无序且分配可重用的 —— 这非常好,因为我们将在不同的可轮询类型中不断注册和注销”关注“。

pub(crate) struct Poller { pub(crate) targets: Slab<Pollable>, }

插入一个类型到 Slab 时,它返回一个 usize 类型的键。为了简便,也为了能更容易桥接到 WASI,我们将定义 EventKey 类型来包装这个键,

// 一个键值代表一个 poller 中的对象 #[repr(transparent)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] pub(crate) struct EventKey(pub(crate) u32);

现在,我们已准备好在轮询器上实现基本 CRUD 方法:new、insert、remove 和 get。除了对 EventKey 的转换之外,这里没有什么有趣的事情发生。

impl Poller { /// 创建一个新的 `Poller` 实例 pub(crate) fn new() -> Self { Self { targets: Slab::new() } } /// 在 `Poller` 中插入一个可轮询对象 pub(crate) fn insert(&mut self, target: Pollable) -> EventKey { let key = self.targets.insert(target); EventKey(key as u32) } /// 返回 key 对应的可轮询对象,如果没找到则返回None pub(crate) fn get(&self, key: &EventKey) -> Option<&Pollable> { self.targets.get(key.0 as usize) } /// 从 `Poller` 中移除 key 对应的可轮询对象 /// /// 如果没找到则返回None pub(crate) fn remove(&mut self, key: EventKey) -> Option<Pollable> { self.targets.try_remove(key.0 as usize) } }

最后,我们实现最有趣的部分:在 Poller 中等待事件的方法,并将其映射到它们各自的 EventKey 上。当我们向 poll 调用提交 Pollable 列表时,会得到一个索引列表,它指向我们提交的 pollable 列表的索引。这些值与我们拥有的EventKey不同,因此我们必须构建一个查找表来将 Pollable 列表的索引映射回事件键。

impl Poller { /// 阻塞当前线程直到有新的事件发生 /// /// `ready_list` 将会被清空 pub(crate) fn block_until(&mut self) -> Vec<EventKey> { // 我们等待多个可轮询对象,当它们被唤醒时,我们会得到一些可轮询 // 对象的索引,此时这些对象的事件已经是可用状态了,接下来就是需 // 要将索引和正确的可轮询对象关联起来。 // 我们的做法是:遍历所有可轮询对象,并记录下它属于哪个索引 let mut indexes = Vec::with_capacity(self.targets.len()); let mut targets = Vec::with_capacity(self.targets.len()); for (index, target) in self.targets.iter() { indexes.push(index); targets.push(target); } // 关联关系有了,我们开始轮询。这将会阻塞,直到有事件发生 let ready_indexes = poll(&targets); // 当获得了就绪的可轮询对象的索引时,需要将它转换成唤醒器的键值 // 之前我们已经建立了索引到唤醒器键值的映射,所以可以直接进行查找 ready_indexes .into_iter() .map(|index| EventKey(indexes[index as usize] as u32)) .collect() } }

我们的 Pollable 抽象就算完成了!

反应器设计

现在,我们可以跟踪可轮询列表,也能订阅它们的方法了,接下来就是把唤醒器(wakers)纳入其中,并创建一个类型供 Future 用于注册对事件的关注。WASI 异步运行时本质上是单线程的,因此我们将在这里使用 Rc 和 RefCell 类型。现在可以构建我们资金的数据类型了,还是从核心结构开始:

use super::polling::{EventKey, Poller}; use std::collections::HashMap; use std::task::Poll; use std::task::Waker; use std::{cell::RefCell, rc::Rc}; use wasi::io::poll::Pollable; /// 管理 WASI 0.1 的异步系统资源 #[derive(Debug, Clone)] pub struct Reactor { inner: Rc<RefCell<InnerReactor>>, } /// 私有的、内部的反应器实现 - 独立出来以便于锁定 #[derive(Debug)] struct InnerReactor { poller: Poller, wakers: HashMap<EventKey, Waker>, } impl Reactor { /// 创建反应器实例 pub(crate) fn new() -> Self { Self { inner: Rc::new(RefCell::new(InnerReactor { poller: Poller::new(), wakers: HashMap::new(), })), } } }

我们可以使用 Rc<RefCell> 的原因,是我们控制了方法所有的访问。我们保证它们总是短暂的,而且永远不会同时发生两次或以上。这反过来为我们提供了一种方便的方式,让我们能够在 &self 而不是 &mut self 上调用方法。说到这里,是时候实现第一个方法了 —— 在可轮询的对象上注册关注的方法。

在 Pollable 中注册关注

这为我们提供了一个异步方法 wait_for,允许我们的反应器异步等待,直到与 Pollable 相关的调用准备好执行。

impl Reactor { /// Wait for the pollable to resolve. pub async fn wait_for(&self, pollable: Pollable) { let mut pollable = Some(pollable); let mut key = None; // 这个方法是核心的循环部分,在 future 被轮询时会执行多次 future::poll_fn(|cx| { // 首先是对反应器加锁,这是单线程并且生命周期非常短暂,所以不存在竞态 // (具体就是不会存在多个线程竞争可变引用)。 let mut reactor = self.inner.borrow_mut(); // pollable 参数中的可轮询对象,会在第一次轮询时插入到反应器 // 的 polller 中。唤醒器则每次轮询都插入到反应器的 wakers里 // 面, 根据 HashMap 的特性,已插入的waker,每次都会被新的替换 let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap())); reactor.wakers.insert(*key, cx.waker().clone()); // 检测 key 对应的可轮询对象是否已就绪,如果已就绪,则清除它并返回 Poll:Ready if reactor.poller.get(key).unwrap().ready() { reactor.poller.remove(*key); reactor.wakers.remove(key); Poll::Ready(()) } else { Poll::Pending } }) .await } }

比其他运行时优秀的地方在于,我们的关注被注册到对象,而这些关注的目标仅仅是某个操作。这意味着“等待此操作准备好”仅需调用 reactor.wait_for(pollable).await。与其他轮询模型中需要 syscall 操作相比有很大的不同,而且意味着可以将反应器所需的所有不变量,直接编码为它的一部分。

在 Rust WG Async 的最近对话中,我们一直在讨论将某种类型的反应器钩子作为 future 中 Context 的一部分是否合适,是否有其他更好的机制。从这个例子中,我认为可以得出以下结论:虽然可以通过 futureContext 共享反应器 —— 但在高层次上,它总是以相同的方式映射到底层函数调用,这至少让我对这是否是最自然的映射产生了怀疑。

阻塞直到事件就绪

处理完这些,我们就可以实现 Poller::block_until 调用的包装器了。根据我们的实现,它将等待所有已注册的轮询者(pollers),并为那些准备好被唤醒的轮询者返回一个 EventKeys 列表。我们要做的就是遍历这个列表,调用每个相关的唤醒程序:

// 阻塞,直到有新的事件就绪。一旦新事件就绪,就调用它的 waker pub(crate) fn block_until(&self) { let mut reactor = self.inner.borrow_mut(); for key in reactor.poller.block_until() { match reactor.wakers.get(&key) { Some(waker) => waker.wake_by_ref(), None => panic!("tried to wake the waker for non-existent `{key:?}`"), } } }

乍一看,通过调用唤醒器来做到这些显得有点笨,因为 WASI 0.2 目前是单线程的,默认情况下它总是只唤醒一个唤醒器,所以使用唤醒器来区分事件是常见且被推荐的做法。并发原语可以构建自己的唤醒器以便跟踪标识实现更精确地唤醒。我们没有控制库构建的唤醒器,原因是我们要唤醒所有的唤醒器,即使默认情况下它们什么都不做。

阻塞(block_on)逻辑设计

现在我们已经完成了反应器,我们需要驱动它。为此,我们将定义一个名为 block_on 的函数,该函数接受一个返回 future 的闭包,并赋予它访问反应器的权限。只要闭包返回的 Future 处于活动状态,任务就会不断取得进展 —— 每次 Future 返回 Pending 时,等待反应器。

在这里,我们将手动驱动 Future,所以首先要定义我们自己的 Waker 实例。我们不支持任何形式的 spawn 函数,而是依赖用户利用库实现结构化的并发,因此可以直接将其设置为空操作(noop)。

/// 构造一个空操作的唤醒器 // 注意: 一旦 <https://github.com/rust-lang/rust/issues/98286> 稳定,我们就可以移除它了 fn noop_waker() -> Waker { const VTABLE: RawWakerVTable = RawWakerVTable::new( // `clone` 返回一个原始的唤醒器 |_| RAW, // `wake` 什么都不做 |_| {}, // `wake_by_ref` 什么都不做 |_| {}, // `drop` 我们没有动态分配空间,什么都不需要做 |_| {}, ); const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE); // SAFETY: 所有字段都是空操作,所以是安全的 unsafe { Waker::from_raw(RAW) } }

接下来是实际的 block_on 实现。核心逻辑基本上是一个循环 { match {} } 语句,它只是在每次迭代时调用 reactor.block_until,直到 Future 完成。

/// 开始驱动事件循环 pub fn block_on<F, Fut>(f: F) -> Fut::Output where F: FnOnce(Reactor) -> Fut, Fut: Future, { // 创建反应器实例 let reactor = Reactor::new(); // 创建并钉住 future 实例,钉住的目的是让它能被轮询 let fut = (f)(reactor.clone()); let mut fut = pin!(fut); // 创建上下文实例,稍后会传入 future let waker = noop_waker(); let mut cx = Context::from_waker(&waker); // 轮询,要么 future 完成然后返回结果,要么继续等待 loop { match fut.as_mut().poll(&mut cx) { Poll::Ready(res) => return res, Poll::Pending => reactor.block_until(), } } }

至此,我们的异步运行时就完成了!

异步运行时案例

2024年3月7日添加的这部分

有了运行时,我们就将 wasi 库的调用封装成异步的了。比如,我们有一个非阻塞的、需要等待一定时长的 sleep 功能,可以使用 subscribe_duration 功能来写,并将它连接到反应器。我们要做的仅仅是封装它,并在 pollable 上等待它完成。使用之前我们的成果会让这变得足够简单:

use std::time::Duration; use wasi::time::monotonic_clock::subscribe_duration; pub async fn sleep(duration: Duration, reactor: &Reactor) { let duration = duration.as_nanos() as u64; // 1. 将时长换算成纳秒 let pollable = subscribe_duration(duration); // 2. 获取可轮询对象 reactor.wait_for(pollable).await; // 3. 等待它就绪 }

从 wasi crate 中取出 API,并将它们与对 reactor.wait_for(…).await 的调用配对,这应该足够简单了。这就是我们异步化整个 wasi API 的基本模式。

本地执行器逻辑

WASI 0.2 是单线程的,本质上不需要访问 task::spawn 抽象。在同步 Rust 中,我们使用线程来结合并行性和并发性;但在异步 Rust 中,我们可以分离它们。futures-concurrency 库提供了您需要的访问任何并发模式的能力;这意味着在没有并行性的情况下不需要执行器。

相反,API(如Vec::joinStreamGroup)甚至提供了访问无界并发原语的方法,人们通常在“本地执行器”中使用 —— 但没有任何抽象的作用域生命周期问题。以下是一个示例,展示了如何在不依赖“本地任务”的情况下并发地发出两个单独的HTTP请求:

block_on(|reactor| async { let client = Client::new(reactor); // <- 使用了还未发布的 `wasi::http` 的封装 let a = async { let url = "https://example.com".parse().unwrap(); let req = Request::new(Method::Get, url); let res = client.send(req).await; let body = read_to_end(res).await; let body = String::from_utf8(body).unwrap(); println!("{body}"); }; let b = async { let url = "https://example.com".parse().unwrap(); let req = Request::new(Method::Get, url); let res = client.send(req).await; let body = read_to_end(res).await; let body = String::from_utf8(body).unwrap(); println!("{body}"); }; (a, b).join().await; // 并行 `a` 或 `b`. }) }

结语

在这篇文章中,我解释了 WASI 的轮询模型,并逐步展示了如何使用它来构建自己的异步运行时。我希望这对异步 Rust 运行时的维护者以及想要自己编写异步运行时的业余爱好者有用。如果你只想使用我今天在这里分享的代码,你可以通过安装 wasi-async-runtime crate 来实现。

参考:

  1. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#the-wasi-0-2-polling-model
  2. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-poller-abstraction
  3. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-reactor-abstraction
  4. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#registering-interest-in-a-pollable
  5. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#blocking-until-events-are-ready
  6. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-block-on-abstraction
  7. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#on-the-absence-of-a-local-executor
  8. https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#conclusion
  9. https://docs.rs/async-std
  10. https://docs.rs/runtime
  11. https://docs.rs/polling
  12. https://github.com/smol-rs/async-io
  13. https://docs.rs/smol
  14. https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/
  15. https://bytecodealliance.org/articles/WASI-0.2
  16. https://docs.rs/smol/latest/smol/
  17. https://docs.rs/monoio/latest/monoio/
  18. https://docs.rs/glommio/latest/glommio/
  19. https://docs.rs/tokio/latest/tokio/
  20. https://en.wikipedia.org/wiki/Io_uring
  21. https://learn.microsoft.com/en-us/windows/win32/api/ioringapi/
  22. https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html
  23. https://docs.rs/wasi/latest/wasi/io/poll/fn.poll.html
  24. https://github.com/bytecodealliance/cargo-component
  25. https://blog.yoshuawuyts.com/why-async-rust/#ad-hoc-concurrency
  26. https://doc.rust-lang.org/std/task/struct.Waker.html
  27. https://docs.rs/slab/latest/slab/
  28. https://docs.rs/futures-concurrency/latest/futures_concurrency/
  29. https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Join.html#impl-Join-for-Vec%3CFut%3E
  30. https://docs.rs/futures-concurrency/latest/futures_concurrency/stream/struct.StreamGroup.html
  31. https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/