原文链接
在 2019 年,我和 Stjepan Glavina 一起开发了 async-std
运行时 。它是 runtime
项目 的一个分支,其本身旨在简化在不同异步运行时之间进行抽象的过程。在 async-std
的开发工作中,我最自豪的是实现了核心 IO 抽象,后来 Stjepan 将其引入 smol
项目 ,分解为 polling
和 async-io
库。他将它们发展成了可以独立工作的强大构建模块,超越了我最初的原型代码。
无论如何,回忆这段往事有我的目的:我刚刚完成了另一个异步运行时的构建 ,不是要大家去使用这个运行时,而是希望它成为一个可行的、最小的、正确的 WASI 0.2 异步运行时实现。本文详细介绍我是如何构建它的,以便您在需要时可以构建自己的运行时( 如果您有这个打算的话 )。我是第一个编写这些代码的人之一,甚至可能是第一个编写专用运行时的人。这意味着,如果 Smol
、Monoio
、Glommio
或 Tokio
想要添加对 WASI 0.2 的支持,也必须实现我已经实现的内容。因此,我想我可以帮助大家省去一些麻烦,同时记录一下我刚刚完成的工作。
WASI 0.2 轮询模型 WASI 0.2 是基于 就绪状态 而非完成状态的。这意味着我们需要等待主机系统告诉我们可以采取某种行动,而不是告诉我们某个操作已成功完成(基于完成状态)。WASI 0.3 可能会切换到基于完成状态的系统,因为 Linux 的 io_uring 和 Windows 的 ioringapi 都是基于完成状态的,并且性能表现非常出色,但我们还没有这样的实现。
WASI 的核心轮询系统由两个组件组成:可轮询类型( Pollable ) 和 轮询方法( poll ) 。可轮询类型表示对某事件的关注。如果我们有某种读取调用,就会有一个与该调用相关联的可轮询类型,我们可以将它提交给主机系统,表示:“请在我可以调用该读取操作时通知我”,这就是系统基于“就绪状态”的含义 —— 将对某操作的关注提交给主机系统,后者会定期产生一个列表,列出哪些操作可以调用,主机系统通过轮询函数来调度这种关注。
WASI 0.2 的轮询模型和 epoll 之间的关键区别在于:在 WASI 中,我们调度的不是对资源(例如文件描述符)的关注,而是对具体操作的关注。在 epoll 下,我们会告诉内核类似于:“嘿,这里有一个代表某个套接字的文件描述符 - 任何时候只要它可以读取或写入新数据我都感兴趣。” 在 WASI 中,我们更加精确;我们生成一个特殊的方法,它返回一个类型,该类型不仅有获取底层数据的能力,还有订阅方法可以返回一个可轮询类型。然后等轮询方法告诉我们可轮询对象已准备就绪时,我们可以调用该方法来获取底层数据,而不会返回错误。说了这么多,我们来看一个基本示例:
use wasi:: http:: outgoing_handler:: { handle, OutgoingRequest} ;
use wasi:: http:: types:: { Fields, Method, Scheme} ;
use wasi:: io:: poll;
fn main ( ) {
let fields = Fields:: new( ) ;
let req = OutgoingRequest:: new( fields) ;
req. set_method ( & Method:: Get) . unwrap ( ) ;
req. set_scheme ( Some ( & Scheme:: Https) ) . unwrap ( ) ;
req. set_path_with_query ( Some ( " /" ) ) . unwrap ( ) ;
req. set_authority ( Some ( " example.com" ) ) . unwrap ( ) ;
let res = handle ( req, None ) . unwrap ( ) ; let pollable = res. subscribe ( ) ; poll:: poll( & [ & pollable] ) ;
let res = res. get ( ) . unwrap ( ) . unwrap ( ) . unwrap ( ) ;
for ( name, _ ) in res. headers ( ) . entries ( ) {
println! ( " header: {name} " ) ;
}
}
当执行 cargo-component ( 传入 -- -S http 标记 )时,结果如下:
header: age
header: cache-control
header: content-type
header: date
header: etag
header: expires
header: last-modified
header: server
header: vary
header: x-cache
header: content-length
轮询器设计 HTTP 请求示例中,我们展示了如何发起一个 HTTP 请求并将该操作注册到轮询方法中。异步计算的一个主要优势是能够执行临时并发 :我们不仅仅希望能够等待某一个操作完成,还希望能够同时等待任意数量的操作完成。这就是为什么轮询方法接受一个可轮询类型的列表作为参数,并且返回的也是一个已准备好继续执行操作的列表。
为了实现这一点,我们需要创建一些类型来保存可轮询对象,它要高效,并允许我们将“就绪”事件列表与某种标识关联起来。这很重要,因为我们正在构建 Rust 异步运行时,需要将“就绪事件”与“唤醒特定的唤醒器 ”关联起来。我们可以从定义一个包含 Slab 的结构体开始。slab 是一种高效的键值数据结构,它在插入值时会返回一个访问值的键,并且是无序且分配可重用的 —— 这非常好,因为我们将在不同的可轮询类型中不断注册和注销”关注“。
pub ( crate ) struct Poller {
pub ( crate ) targets : Slab< Pollable> ,
}
插入一个类型到 Slab 时,它返回一个 usize 类型的键。为了简便,也为了能更容易桥接到 WASI,我们将定义 EventKey 类型来包装这个键,
# [ repr ( transparent ) ]
# [ derive ( Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy ) ]
pub ( crate ) struct EventKey ( pub( crate) u32 );
现在,我们已准备好在轮询器上实现基本 CRUD 方法:new、insert、remove 和 get。除了对 EventKey 的转换之外,这里没有什么有趣的事情发生。
impl Poller {
pub ( crate ) fn new ( ) -> Self {
Self {
targets: Slab:: new( )
}
}
pub ( crate ) fn insert ( & mut self , target : Pollable) -> EventKey {
let key = self . targets. insert ( target) ;
EventKey( key as u32 )
}
pub ( crate ) fn get ( & self , key : & EventKey) -> Option < & Pollable> {
self . targets. get ( key. 0 as usize )
}
pub ( crate ) fn remove ( & mut self , key : EventKey) -> Option < Pollable> {
self . targets. try_remove ( key. 0 as usize )
}
}
最后,我们实现最有趣的部分:在 Poller 中等待事件的方法,并将其映射到它们各自的 EventKey 上。当我们向 poll 调用提交 Pollable 列表时,会得到一个索引列表,它指向我们提交的 pollable 列表的索引。这些值与我们拥有的EventKey不同,因此我们必须构建一个查找表来将 Pollable 列表的索引映射回事件键。
impl Poller {
pub ( crate ) fn block_until ( & mut self ) -> Vec < EventKey> {
let mut indexes = Vec :: with_capacity( self . targets. len ( ) ) ;
let mut targets = Vec :: with_capacity( self . targets. len ( ) ) ;
for ( index, target) in self . targets. iter ( ) {
indexes. push ( index) ;
targets. push ( target) ;
}
let ready_indexes = poll ( & targets) ;
ready_indexes
. into_iter ( )
. map ( | index | EventKey( indexes[ index as usize ] as u32 ) )
. collect ( )
}
}
我们的 Pollable 抽象就算完成了!
反应器设计 现在,我们可以跟踪可轮询列表,也能订阅它们的方法了,接下来就是把唤醒器(wakers)纳入其中,并创建一个类型供 Future 用于注册对事件的关注。WASI 异步运行时本质上是单线程的,因此我们将在这里使用 Rc 和 RefCell 类型。现在可以构建我们资金的数据类型了,还是从核心结构开始:
use super :: polling:: { EventKey, Poller} ;
use std:: collections:: HashMap;
use std:: task:: Poll;
use std:: task:: Waker;
use std:: { cell:: RefCell, rc:: Rc} ;
use wasi:: io:: poll:: Pollable;
# [ derive ( Debug, Clone ) ]
pub struct Reactor {
inner : Rc< RefCell< InnerReactor> > ,
}
# [ derive ( Debug ) ]
struct InnerReactor {
poller : Poller,
wakers : HashMap< EventKey, Waker> ,
}
impl Reactor {
pub ( crate ) fn new ( ) -> Self {
Self {
inner: Rc:: new( RefCell:: new( InnerReactor {
poller: Poller:: new( ) ,
wakers: HashMap:: new( ) ,
} ) ) ,
}
}
}
我们可以使用 Rc<RefCell> 的原因,是我们控制了方法所有的访问。我们保证它们总是短暂的,而且永远不会同时发生两次或以上。这反过来为我们提供了一种方便的方式,让我们能够在 &self 而不是 &mut self 上调用方法。说到这里,是时候实现第一个方法了 —— 在可轮询的对象上注册关注的方法。 在 Pollable 中注册关注 这为我们提供了一个异步方法 wait_for,允许我们的反应器异步等待,直到与 Pollable 相关的调用准备好执行。
impl Reactor {
pub async fn wait_for ( & self , pollable : Pollable) {
let mut pollable = Some ( pollable) ;
let mut key = None ;
future:: poll_fn( | cx | {
let mut reactor = self . inner. borrow_mut ( ) ;
let key = key. get_or_insert_with ( | | reactor. poller. insert ( pollable. take ( ) . unwrap ( ) ) ) ;
reactor. wakers. insert ( * key, cx. waker ( ) . clone ( ) ) ;
if reactor. poller. get ( key) . unwrap ( ) . ready ( ) {
reactor. poller. remove ( * key) ;
reactor. wakers. remove ( key) ;
Poll:: Ready( ( ) )
} else {
Poll:: Pending
}
} )
. await
}
}
比其他运行时优秀的地方在于,我们的关注被注册到对象,而这些关注的目标仅仅是某个操作。这意味着“等待此操作准备好”仅需调用 reactor.wait_for(pollable).await
。与其他轮询模型中需要 syscall
操作相比有很大的不同,而且意味着可以将反应器所需的所有不变量,直接编码为它的一部分。
在 Rust WG Async 的最近对话中,我们一直在讨论将某种类型的反应器钩子作为 future 中 Context
的一部分是否合适,是否有其他更好的机制。从这个例子中,我认为可以得出以下结论:虽然可以通过 future
的 Context
共享反应器 —— 但在高层次上,它总是以相同的方式映射到底层函数调用,这至少让我对这是否是最自然的映射产生了怀疑。
阻塞直到事件就绪 处理完这些,我们就可以实现 Poller::block_until 调用的包装器了。根据我们的实现,它将等待所有已注册的轮询者(pollers),并为那些准备好被唤醒的轮询者返回一个 EventKeys 列表。我们要做的就是遍历这个列表,调用每个相关的唤醒程序:
pub ( crate ) fn block_until ( & self ) {
let mut reactor = self . inner. borrow_mut ( ) ;
for key in reactor. poller. block_until ( ) {
match reactor. wakers. get ( & key) {
Some ( waker) => waker. wake_by_ref ( ) ,
None => panic! ( " tried to wake the waker for non-existent `{key:?}`" ) ,
}
}
}
乍一看,通过调用唤醒器来做到这些显得有点笨,因为 WASI 0.2 目前是单线程的,默认情况下它总是只唤醒一个唤醒器,所以使用唤醒器来区分事件是常见且被推荐的做法。并发原语可以构建自己的唤醒器以便跟踪标识实现更精确地唤醒。我们没有控制库构建的唤醒器,原因是我们要唤醒所有的唤醒器,即使默认情况下它们什么都不做。
阻塞(block_on)逻辑设计 现在我们已经完成了反应器,我们需要驱动它。为此,我们将定义一个名为 block_on 的函数,该函数接受一个返回 future 的闭包,并赋予它访问反应器的权限。只要闭包返回的 Future 处于活动状态,任务就会不断取得进展 —— 每次 Future 返回 Pending 时,等待反应器。
在这里,我们将手动驱动 Future,所以首先要定义我们自己的 Waker 实例。我们不支持任何形式的 spawn 函数,而是依赖用户利用库实现结构化的并发,因此可以直接将其设置为空操作(noop)。
fn noop_waker ( ) -> Waker {
const VTABLE : RawWakerVTable = RawWakerVTable:: new(
| _ | RAW ,
| _ | { } ,
| _ | { } ,
| _ | { } ,
) ;
const RAW : RawWaker = RawWaker:: new( ptr:: null( ) , & VTABLE ) ;
unsafe { Waker:: from_raw( RAW ) }
}
接下来是实际的 block_on 实现。核心逻辑基本上是一个循环 { match {} } 语句,它只是在每次迭代时调用 reactor.block_until,直到 Future 完成。
pub fn block_on < F, Fut> ( f : F) -> Fut :: Output
where
F: FnOnce( Reactor) -> Fut,
Fut: Future,
{
let reactor = Reactor:: new( ) ;
let fut = ( f) ( reactor. clone ( ) ) ;
let mut fut = pin! ( fut) ;
let waker = noop_waker ( ) ;
let mut cx = Context:: from_waker( & waker) ;
loop {
match fut. as_mut ( ) . poll ( & mut cx) {
Poll:: Ready( res) => return res,
Poll:: Pending => reactor. block_until ( ) ,
}
}
}
至此,我们的异步运行时就完成了!
异步运行时案例 2024年3月7日添加的这部分
有了运行时,我们就将 wasi 库的调用封装成异步的了。比如,我们有一个非阻塞的、需要等待一定时长的 sleep 功能,可以使用 subscribe_duration 功能来写,并将它连接到反应器。我们要做的仅仅是封装它,并在 pollable 上等待它完成。使用之前我们的成果会让这变得足够简单:
use std:: time:: Duration;
use wasi:: time:: monotonic_clock:: subscribe_duration;
pub async fn sleep ( duration : Duration, reactor : & Reactor) {
let duration = duration. as_nanos ( ) as u64 ; let pollable = subscribe_duration ( duration) ; reactor. wait_for ( pollable) . await; }
从 wasi crate 中取出 API,并将它们与对 reactor.wait_for(…).await 的调用配对,这应该足够简单了。这就是我们异步化整个 wasi API 的基本模式。
本地执行器逻辑 WASI 0.2 是单线程的,本质上不需要访问 task::spawn 抽象。在同步 Rust 中,我们使用线程来结合并行性和并发性;但在异步 Rust 中,我们可以分离它们。futures-concurrency 库提供了您需要的访问任何并发模式的能力;这意味着在没有并行性的情况下不需要执行器。
相反,API(如Vec::join 和StreamGroup )甚至提供了访问无界并发原语的方法,人们通常在“本地执行器”中使用 —— 但没有任何抽象的作用域生命周期问题。以下是一个示例,展示了如何在不依赖“本地任务”的情况下并发地发出两个单独的HTTP请求:
block_on ( | reactor | async {
let client = Client:: new( reactor) ;
let a = async {
let url = " https://example.com" . parse ( ) . unwrap ( ) ;
let req = Request:: new( Method:: Get, url) ;
let res = client. send ( req) . await;
let body = read_to_end ( res) . await;
let body = String :: from_utf8( body) . unwrap ( ) ;
println! ( " {body} " ) ;
} ;
let b = async {
let url = " https://example.com" . parse ( ) . unwrap ( ) ;
let req = Request:: new( Method:: Get, url) ;
let res = client. send ( req) . await;
let body = read_to_end ( res) . await;
let body = String :: from_utf8( body) . unwrap ( ) ;
println! ( " {body} " ) ;
} ;
( a, b) . join ( ) . await; } )
}
结语 在这篇文章中,我解释了 WASI 的轮询模型,并逐步展示了如何使用它来构建自己的异步运行时。我希望这对异步 Rust 运行时的维护者以及想要自己编写异步运行时的业余爱好者有用。如果你只想使用我今天在这里分享的代码,你可以通过安装 wasi-async-runtime crate 来实现。
参考: https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#the-wasi-0-2-polling-model https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-poller-abstraction https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-reactor-abstraction https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#registering-interest-in-a-pollable https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#blocking-until-events-are-ready https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#designing-the-block-on-abstraction https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#on-the-absence-of-a-local-executor https://blog.yoshuawuyts.com/building-an-async-runtime-for-wasi/#conclusion https://docs.rs/async-std https://docs.rs/runtime https://docs.rs/polling https://github.com/smol-rs/async-io https://docs.rs/smol https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/ https://bytecodealliance.org/articles/WASI-0.2 https://docs.rs/smol/latest/smol/ https://docs.rs/monoio/latest/monoio/ https://docs.rs/glommio/latest/glommio/ https://docs.rs/tokio/latest/tokio/ https://en.wikipedia.org/wiki/Io_uring https://learn.microsoft.com/en-us/windows/win32/api/ioringapi/ https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html https://docs.rs/wasi/latest/wasi/io/poll/fn.poll.html https://github.com/bytecodealliance/cargo-component https://blog.yoshuawuyts.com/why-async-rust/#ad-hoc-concurrency https://doc.rust-lang.org/std/task/struct.Waker.html https://docs.rs/slab/latest/slab/ https://docs.rs/futures-concurrency/latest/futures_concurrency/ https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Join.html#impl-Join-for-Vec%3CFut%3E https://docs.rs/futures-concurrency/latest/futures_concurrency/stream/struct.StreamGroup.html https://docs.rs/wasi-async-runtime/latest/wasi_async_runtime/
>>> → Linux下的透明代理( systemd & nftables )
已复制! 复制代码到剪贴板