原文链接
我发现,理解一个新概念的最好方法之一是从头开始:从零起步,一点点地创建它,不仅要学习它是如何工作的,还要学习它为什么会这样设计。
这不是一篇实用的异步指南,希望它涵盖的背景知识能帮助您思考异步问题,或至少满足您的好奇心,而且不会因为太多细节让您觉得厌烦。
但它依然很长……
目录
最简单的 Web 服务
我们从最简单的Web服务开始,进入异步编程之旅。我们的Web服务只使用 std::net crate 中的标准网络类型,只接受 HTTP 请求并用基本响应进行回复。接下来的内容中,我们将尽量忽略 HTTP 规范相关要求、忽略代码实用性,将焦点集中在 Web 服务的基本流程上。
HTTP 是构建于 TCP 之上,基于文本的传输协议。我们的 Web 服务通过建立一个TcpListener 来接受客户端的 TCP 连接请求:
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("localhost:3000").unwrap();
}
Web 服务相应来自客户端传入的连接请求,按顺序挨个执行它们。
use std::net::{TcpListener, TcpStream};
use std::io;
fn main() {
loop {
let (connection, _) = listener.accept().unwrap();
if let Err(e) = handle_connection(connection) {
println!("failed to handle connection: {e}")
}
}
}
fn handle_connection(connection: TcpStream) -> io::Result<()> {
}
TCP 连接是 Web 服务与客户端之间的双向数据通道,用 TcpStream 类型表示。它实现了 Read 和 Write 特性( trait ),抽象了 Tcp 内部细节,使我们能够读取或写入普通的字节数据。
Web 服务接受 HTTP 请求,为此,需要创建缓存数组来保存传入的请求。
fn handle_connection(connection: TcpStream) -> io::Result<()> {
let mut request = [0u8; 1024];
Ok(())
}
接下来就是从连接读取数据,写入缓存。读取动作每次读到的字节数不定,所以需要跟踪已读取字节数。读取动作在循环中不断重复,依次将读到的内容写入缓存。
use std::io::Read;
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let mut read = 0;
let mut request = [0u8; 1024];
loop {
let num_bytes = connection.read(&mut request[read..])?;
read += num_bytes;
}
Ok(())
}
直到读到一个特殊的字符序列 \r\n\r\n
,我们约定用它来作为读取的结束标志。
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let mut read = 0;
let mut request = [0u8; 1024];
loop {
let num_bytes = connection.read(&mut request[read..])?;
read += num_bytes;
if request.get(read - 4..read) == Some(b"\r\n\r\n") {
break;
}
}
Ok(())
}
还有一种情况:如果连接已断开,读到的字节数将是0,此时如果客户端的请求未发送完毕,直接返回转入处理下一个连接。
再次声明,不用担心 HTTP 规范限制,我们的目的是让 Web 服务能工作就行。
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let mut read = 0;
let mut request = [0u8; 1024];
loop {
let num_bytes = connection.read(&mut request[read..])?;
if num_bytes == 0 { println!("client disconnected unexpectedly");
return Ok(());
}
read += num_bytes;
if request.get(read - 4..read) == Some(b"\r\n\r\n") {
break;
}
}
Ok(())
}
一旦读取完成,就可以将读到的结果转换为字符串,并以日志形式输出到控制台。
fn handle_connection(stream: TcpStream) -> io::Result<()> {
let mut read = 0;
let mut request = [0u8; 1024];
loop {
}
let request = String::from_utf8_lossy(&request[..read]);
println!("{request}");
Ok(())
}
接下来实现响应部分。
跟读操作类似,写操作也可能不会一次完成。所以同样需要采用循环的方式,不断写入响应数据到流中,每次写入从上一次结束的位置开始,直到写入全部完成。
use std::io::Write;
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let response = concat!(
"HTTP/1.1 200 OK\r\n",
"Content-Length: 12\n",
"Connection: close\r\n\r\n",
"Hello world!"
);
let mut written = 0;
loop {
let num_bytes = connection.write(response[written..].as_bytes())?;
if num_bytes == 0 {
println!("client disconnected unexpectedly");
return Ok(());
}
written += num_bytes;
if written == response.len() {
break;
}
}
}
最后,我们执行刷新( flush )操作,确保写入操作已执行完毕。
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let mut written = 0;
loop {
}
connection.flush()
}
好了,现在我们有了一个能工作的 Web 服务!
$ curl localhost:3000
多线程的 Web 服务
好吧,它虽然可以工作,但是有一点问题。
看看我们接受请求的循环:
let listener = TcpListener::bind("localhost:3000").unwrap();
loop {
let (connection, _) = listener.accept().unwrap();
if let Err(e) = handle_connection(connection) {
}
}
看到问题了吗?
我们一次只能响应一个请求。
从一个网络连接读写数据不是即时完成的,中间要经过大量的基础设备,比如网关、路由器等等,如果两个用户同时向我们发起请求会发生什么?十个、十万个呢?随用户规模增长,我们的服务会延迟、卡顿,直至不可用,那么如何改进呢?
可能的选择有好几种,但到目前为止,最简单的是线程的方式。为每个请求创建一个线程,就能我们的服务能响应无限的用户增长,对吧?
fn main() {
loop {
let (connection, _) = listener.accept().unwrap();
std::thread::spawn(|| {
if let Err(e) = handle_connection(connection) {
}
});
}
}
实际上,这很有可能!就算不是无限的,但随着每个请求都在单独的线程中处理,我们服务的吞吐量会显著增加。
这到底是怎么回事?
跟大多数现代操作系统一样,在 linux 中,程序都是在一个单独的进程中运行的。虽然看起来每个活动程序都是同时运行的,但在物理上,一个 CPU 内核一次只能执行一个任务,或者通过超线程技术同时执行两个。为了让所有的程序都能执行,操作系统内核会不断切换它们,暂停当前正在运行的程序,切换到另一个并运行它,如此往复。这些上下文切换以毫秒为单位发生,形成了感觉上的“并行”。
内核调度器通过在多个内核之间分配工作负载来利用多核。每个核心管理一部分进程,这意味着某些程序可以真正意义上得到并行运行。
cpu1 cpu2 cpu3 cpu4
|----|----|----|----|
| p1 | p3 | p5 | p7 |
| |____| |____|
| | |____| |
|____| p4 | | p8 |
| | | p6 |____|
| p2 |____| | |
| | p3 | | p7 |
| | | | |
这种调度类型被称为抢占式多任务调度:内核决定进程运行多长时间被抢占,切换到其他进程。
该模式下,内核确保各独立进程不会访问到其他进程的内存,从而保证各种类型的程序都能得到良好地运行。但是,这使得上下文切换更加昂贵,因为内核在执行上下文切换之前必须刷新内存的某些部分,以确保内存被正确隔离。
线程跟进程类似,区别是线程可以与同一父进程下的线程共享内存,从而实现在同一程序的线程之间共享状态。除此之外线程的调度和进程没有任何区别。
对我们的服务而言,1 线程 / 1 请求 模式最关键的问题是我们的服务是 I/O 绑定的。handle_connection
执行过程中中绝大部分时间并不是用于计算,而是用于等待,等待从网络连接中收发数据,等待读、写、刷新等 I/O 阻塞的操作执行完毕。我们希望的是,发送一个 I/O 请求后,让出控制权给内核,等操作完成后内核再将控制权交回。在此期间,内核可以执行其他程序。
通常情况下,处理一个网络请求时绝大部分时间都在等待其他任务完成,比如数据库查询或接收 HTTP 请求。多个工作线程效率高的原因是我们可以利用等待的时间来处理其他请求。
非阻塞的 Web 服务
看起来多线程已经完全满足我们的需求,并且它使用也很简单,那么为什么我们还要继续呢?
您也许听说过线程很“重”、上下文切换非常“昂贵”等说法,但是现在,这并不准确,现代的服务器能毫不费力地处理上万的线程。
问题在于阻塞 I/O 将程序的控制权完全交给了操作系统内核,在程序执行完成之前,我们没有任何的干预手段可用,这让我们实现某些操作变得非常困难,比如取消操作和选择操作。
假设我们要实现优雅的服务关停操作。当我们按下 ctrl+c,程序不会马上退出,而是立刻停止接受新的连接请求,当前已建立连接的任务会继续执行,直到完成,或者是 30 秒后被强行终止,最后服务才退出。
在阻塞 I/O 模式下,这里的问题是:我们的 accept 循环会阻塞,直到下一个连接到来。我们可以在新连接请求被接受之前或之后检查 ctrl+c 信号,如果在处理 accept 时信号进来,我们必须等待下一次连接被接受,这期间只有内核拥有程序完全的控制权。
loop {
if got_ctrl_c() {
break;
}
let (connection, _) = listener.accept().unwrap();
if got_ctrl_c() {
break;
}
std::thread::spawn(|| );
}
我们想要的是像 match 操作一样,针对 I/O,同时侦听连接请求和 ctrl+c 信号:
loop {
match {
ctrl_c() => {
break;
},
Ok((connection, _)) = listener.accept() => {
std::thread::spawn(|| ...);
}
}
}
对于运行时间超过 30 秒的任务,又该怎么处理呢?我们可以设置一个标记让线程停止,那么又该多久检测一次标记呢?我们又回到了老问题:因为 I/O 阻塞导致我们丧失了程序的控制权,除了等它执行完毕,没有好的方式来强制取消一个线程的执行。
这正是线程和阻塞 I/O 令人头疼的地方,因为应用程序的控制权完全交给了内核,导致实现基于事件的逻辑变得非常困难。
某些平台下,可以使用平台特定接口来实现这一点,比如Unix信号处理机制。虽然信号处理机制简单,并且在某些场景下工作得很好,但在场景变得复杂的时候,信号处理机制会变得非常繁琐。在本文末尾,我们描述了另一种表达复杂控制流的方法。您可以根据实际情况来挑选合适的方式。
那么,有没有既能执行 I/O,又不用出让控制权给内核的实现方法呢?
实际上,还有另一种实现 I/O 操作的方法,称为非阻塞 I/O( non-bloking I/O )。顾名思义,非阻塞操作永远不会阻塞调用线程,它会立即返回,如果给定的资源不可用,则返回一个错误。
通过将 TCP 侦听器和连接置于非阻塞模式,我们可以切换到非阻塞 I/O 的实现方式。
let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();
loop {
let (connection, _) = listener.accept().unwrap();
connection.set_nonblocking(true).unwrap();
}
非阻塞 I/O 的工作模式有一些不同:如果 I/O 请求不能立即完成,内核将返回一个 WouldBlock 错误代码。尽管被表示为错误代码,但 WouldBlock 并不是真正的错误,它只是意味着当前操作无法立即执行完毕,让我们可以自行决定接下来要做什么。
use std::io;
listener.set_nonblocking(true).unwrap();
loop {
let connection = match listener.accept() {
Ok((connection, _)) => connection,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
}
Err(e) => panic!("{e}"),
};
connection.set_nonblocking(true).unwrap();
}
假设在调用 accept() 之后没有连接请求进来,在阻塞 I/O 模式下,我们只能一直等待新的连接,但现在,WouldBlock 不是将控制权交给内核,而是交回我们手里。
我们的 I/O 终于不阻塞了!但此时我们能做点什么呢?
WouldBlock 是一个临时的状态,意味着在未来某个时刻,当前套接字会准备好用于读或写。所以从技术上讲,我们应该一直等到(作者用了自旋这个单词-spin until)套接字状态变成可用( ready )。
loop {
let connection = match listener.accept() {
Ok((connection, _)) => connection,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue; }
Err(e) => panic!("{e}"),
};
}
但是自旋还不如阻塞,至少阻塞 I/O 模式下,操作系统还可以给其他线程执行的机会。所以我们真正需要的,是为全部任务创建一个有序的调度器,来完成曾经由操作系统来为我们做的事情。
让我们从头回顾一遍:
首先我们创建了一个 TCP 侦听器:
let listener = TcpListener::bind("localhost:3000").unwrap();
然后设置它为非阻塞模式:
listener.set_nonblocking(true).unwrap();
接下来进入主循环,循环中第一件事情是接受一个新的 TCP 连接请求。
loop {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
}
现在,我们不能继续直接为已建立的连接服务,导致其他请求被忽略。我们必须能跟踪所有的活动连接。
let mut connections = Vec::new();
loop {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
connections.push(connection); },
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
}
但是我们不能无休止地接受连接请求。当没有操作系统调度的便利时,我们需要在主循环的每一次迭代中,将所有的事情都推进一点点。一旦新的连接请求被接受,我们需要处理所有的活跃连接。
对于每一个连接,我们必须执行任何需要的操作来推进请求的处理,无论是读取请求还是写入响应。
loop {
match listener.accept() {
}
for connection in connections.iter_mut() {
}
}
还记得之前的 handle_connection 功能吗?
fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
let mut request = [0u8; 1024];
let mut read = 0;
loop {
let num_bytes = connection.read(&mut request[read..])?; }
let request = String::from_utf8_lossy(&request[..read]);
println!("{request}");
let response = ;
let mut written = 0;
loop {
let num_bytes = connection.write(&response[written..])?;
}
connection.flush().unwrap(); }
我们需要执行不同的 I/O 操作,比如读、写和刷新。阻塞模式下,代码会按我们写的顺序执行。但现在我们必须面对这样一个事实,在执行 I/O 的任何时候都可能面临WouldBlock,导致当前执行无法取得进展。
同时,我们不能简单地丢掉这个结果去处理下一个活动连接,我们需要跟踪当前连接的状态,方便在下次回来时能从正确的地方继续。
我们设计了一个枚举来保存 handle_connetion 的状态,它有三种可能的状态:
enum ConnectionState {
Read,
Write,
Flush
}
请记住,我们需要的不是记录事务单独的状态,例如将请求转换为字符串,我们需要的是在遇到 WouldBlock 时,能记住当时的状态。
读、写操作的状态还包含当前已读写的字节数和本地缓存。之前我们在函数中定义它,现在我们需要它在整个主循环的生命周期中存在。
enum ConnectionState {
Read {
request: [u8; 1024],
read: usize
},
Write {
response: &'static [u8],
written: usize,
},
Flush,
}
我们在每一次 handle_connection 开始执行时初始化连接状态为 Read,request 为 0 值,read 为 0 字节。
let mut connections = Vec::new();
loop {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
let state = ConnectionState::Read { request: [0u8; 1024],
read: 0,
};
connections.push((connection, state));
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
}
现在,我们可以尝试根据其当前状态,将每个连接向前推进了。
loop {
match listener.accept() {
}
for (connection, state) in connections.iter_mut() {
if let ConnectionState::Read { request, read } = state {
}
if let ConnectionState::Write { response, written } = state {
}
if let ConnectionState::Flush = state {
}
}
}
如果当前连接仍然处于 Read 状态,继续做读取操作,唯一不同的是,如果收到WouldBlock, 则继续处理下一个活动连接。
'next: for (connection, state) in connections.iter_mut() {
if let ConnectionState::Read { request, read } = state {
loop {
match connection.read(&mut request[*read..]) {
Ok(n) => {
*read += n
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue 'next; }
Err(e) => panic!("{e}"),
}
if request.get(*read - 4..*read) == Some(b"\r\n\r\n") {
break;
}
}
let request = String::from_utf8_lossy(&request[..*read]);
println!("{request}");
}
}
还有读到 0 字节的问题需要处理,之前我们只是从 handle_connection 中退出,state 变量会自动被清空。但是现在,我们必须自己处理当前连接。当前我们正在遍历connections 列表,所以需要一个单独的列表来收集已完成的活动连接,后续再来处理。
let mut completed = Vec::new();
'next: for (i, (connection, state)) in connections.iter_mut().enumerate() {
if let ConnectionState::Read { request, read } = state {
loop {
match connection.read(&mut request[*read..]) {
Ok(0) => {
println!("client disconnected unexpectedly");
completed.push(i); continue 'next;
}
Ok(n) => *read += n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue 'next,
Err(e) => panic!("{e}"),
}
}
}
}
for i in completed.into_iter().rev() {
connections.remove(i); }
读操作完成后,我们必须切换到 Write 状态并尝试写入回应。写操作的逻辑跟读操作非常相似,写操作完成后,需要切换到 Flush 状态。
if let ConnectionState::Read { request, read } = state {
let response = concat!(
"HTTP/1.1 200 OK\r\n",
"Content-Length: 12\n",
"Connection: close\r\n\r\n",
"Hello world!"
);
*state = ConnectionState::Write { response: response.as_bytes(),
written: 0,
};
}
if let ConnectionState::Write { response, written } = state {
loop {
match connection.write(&response[*written..]) {
Ok(0) => {
println!("client disconnected unexpectedly");
completed.push(i);
continue 'next;
}
Ok(n) => {
*written += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue 'next;
}
Err(e) => panic!("{e}"),
}
if *written == response.len() {
break;
}
}
*state = ConnectionState::Flush;
}
成功完成刷新操作后,我们标记当前连接为完成,并从 completed 列表中移除。
if let ConnectionState::Flush = state {
match connection.flush() {
Ok(_) => {
completed.push(i); },
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue 'next;
}
Err(e) => panic!("{e}"),
}
}
就是这样!以下是新的更高水平的 web 服务流程:
fn main() {
let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();
let mut connections = Vec::new();
loop {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
let state = ConnectionState::Read {
request: Vec::new(),
read: 0,
};
connections.push((connection, state));
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
let mut completed = Vec::new();
'next: for (i, (connection, state)) in connections.iter_mut().enumerate() {
if let ConnectionState::Read { request, read } = state {
*state = ConnectionState::Write { response, written };
}
if let ConnectionState::Write { response, written } = state {
*state = ConnectionState::Flush;
}
if let ConnectionState::Flush = state {
}
}
for i in completed.into_iter().rev() {
connections.remove(i);
}
}
}
现在,我们必须自己管理调度,事情变得越来越复杂了……
关键的时候来了……
$ curl localhost:3000
工作正常!
多路复用的 Web 服务
现在我们的 Web 服务能在单线程中执行多个请求,没有任何阻塞。如果某个操作被阻塞,它将记住自己的状态并切换,让其他操作执行,这跟内核调度器的行为一致。但是,新设计带来了两个问题.
首先是所有的工作都在主线程中运行,只利用了一个 CPU 核心。我们尽最大努力高效地利用这一核心,但一次仍然只执行一个任务。如果线程能分布在多个核心上,我们同一时间就可以做更多的工作。
不过有一个更大的问题。
我们的主循环效率并不高。
我们对每一个活跃的连接,每一次循环的迭代,都要向内核发出一个 I/O 请求,来检查它是否准备好了。即使调用 read 或 write 返回了 WouldBlock,实际没有执行任何 I/O,它仍然是一个系统调用。系统调用并不便宜。我们可能有 10k 个活跃的连接,但只有 500 个是准备好的。当只有 500 个连接会真正做些事情的时候,调用 read 或 write 10k 次是对 CPU 周期的巨大浪费。
随着连接数的增加,我们的循环变得越来越低效,浪费了更多的时间做无用的工作。
怎么解决这个问题呢?使用阻塞 I/O 时,内核能够有效地调度任务,因为它知道资源什么时候准备好了。使用非阻塞 I/O 时,我们不检查就不知道,但是检查是很昂贵的。
我们需要的是一种高效的方式来跟踪所有的活跃连接,并且在它们准备好的时候得到通知。
事实证明,我们并不是第一个遇到这个问题的人。每个操作系统都提供了针对这个问题的解决方案。在 Linux 上,它叫做 epoll。
epoll(7) - I/O 事件通知机制
epoll API 执行的任务与 poll(2) 类似:监视多个文件描述符,看看是否有任何一个可以进行 I/O 操作。epoll API 可以作为边缘触发(edge-triggered)或水平触发(level-triggered)的接口使用,并且能够很好地扩展到监视大量的文件描述符。
听上去很完美!我们试试看。
epoll 是一组 Linux 系统调用,让我们可以处理一组非阻塞的套接字。直接使用 epoll 并不是很方便,所以我们将使用 epoll crate,它是一个对 C 接口的轻度封装。
首先,我们使用 create 函数来初始化一个 epoll 实例。
fn main() {
let epoll = epoll::create(false).unwrap(); }
epoll::create 返回一个文件描述符,它代表了新创建的 epoll 实例。你可以把它看作是一个文件描述符的集合,我们可以从中添加或删除文件描述符。
在 Linux/Unix 中,一切都被视为文件。文件系统上的实际文件、TCP 套接字、以及外部设备都是可以读写的文件。文件描述符是一个整数,它表示系统中打开的“文件”。本文接下来的部分,我们将频繁使用它。
我们要添加的第一个文件描述符是 TCP 监听器。可以用 epoll::ctl 命令来修改 epoll 集合,添加文件描述符使用EPOLL_CTL_ADD标志。
use epoll::{Event, Events, ControlOptions::*};
use std::os::fd::AsRawFd;
fn main() {
let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();
let event = Event::new(Events::EPOLLIN, listener.as_raw_fd() as _);
epoll::ctl(epoll, EPOLL_CTL_ADD, listener.as_raw_fd(), event).unwrap(); }
我们传入要注册的资源的文件描述符,也就是 TCP 监听器,以及一个事件。一个事件有两个部分,interest flag 和 data field。interest flag 让我们可以告诉 epoll 我们感兴趣的 I/O 事件。在 TCP 监听器中,我们想要在有新连接进来时得到通知,所以传入 EPOLLIN 标志。
data field 让我们可以存储一个能够唯一标识每个资源的 ID。记住,文件描述符是一个给定文件的唯一整数,所以直接使用它。你会在下一步看到为什么这很重要。
现在轮到主循环。这次不用自旋,用 epoll::wait。
epoll_wait(2) - 等待 epoll 文件描述符上的 I/O 事件
对文件描述符 epfd 指向的 epoll(7) 实例而言,epoll_wait() 系统调用会等待其上的事件。interest list 中的文件描述符指向的 ready list 中,如果有一些可用事件的信息,那么这些信息通过 events 指向的缓冲区返回。
调用 epoll_wait() 将阻塞,直到以下任一情况发生:
- 文件描述符提交了一个事件;
- 调用被信号处理器中断;
- 超时;
epoll::wait 是 epoll 的神奇之处:它阻塞直到我们注册的任何事件变得就绪,并告诉我们哪些事件就绪了。此时这仅用于有新连接进来时,但是很快我们将使用它来阻塞读、写和刷新事件,这些事件我们之前是用自旋的方式处理的。
您可能不喜欢 epoll::wait 是“阻塞”的这一事实,但是,它只在没有任何事情可做的时候才阻塞,而之前我们是在自旋并且做无用的系统调用。这种同时阻塞多个操作的方法被称为 I/O 多路复用。
epoll::wait 接受一个事件的列表,当所关注的文件描述符就绪,它会将文件描述符的信息填充到列表,然后返回被添加的事件的数量。
loop {
let mut events = [Event::new(Events::empty(), 0); 1024];
let timeout = -1; let num_events = epoll::wait(epoll, timeout, &mut events).unwrap();
for event in &events[..num_events] {
}
}
每个事件都包含数据字段,该字段与就绪的资源相关联。
for event in &events[..num_events] {
let fd = event.data as i32;
}
还记得我们用文件描述符来标记数据字段吗?我们可以用它来检查事件是否是针对TCP监听器的,如果是,那就意味着有一个传入连接已经准备好被接受了。
for event in &events[..num_events] {
let fd = event.data as i32;
if fd == listener.as_raw_fd() {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
}
}
如果返回 WouldBlock, 则移动到下一个连接,等待下一次事件发生。
现在需要在 epoll 中注册新的连接,跟注册侦听器一样。
for event in &events[..num_events] {
let fd = event.data as i32;
if fd == listener.as_raw_fd() {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
let fd = connection.as_raw_fd();
let event = Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as _);
epoll::ctl(epoll, EPOLL_CTL_ADD, fd, event).unwrap(); }
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
}
}
这次我们注册了EPOLLIN 和 EPOLLOUT 事件,因为根据连接状态,我们要关注读或写事件。
注册了连接之后,我们将得到 TCP 侦听器和某个连接的事件。我们需要用某种方式存储连接和它们的状态,并能通过查找文件描述符的方式来访问它们。
这次不用 List,用 HashMap。
let mut connections = HashMap::new();
loop {
'next: for event in &events[..num_events] {
let fd = event.data as i32;
if fd == listener.as_raw_fd() {
match listener.accept() {
Ok((connection, _)) => {
let state = ConnectionState::Read {
request: [0u8; 1024],
read: 0,
};
connections.insert(fd, (connection, state)); }
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("{e}"),
}
continue 'next;
}
let (connection, state) = connections.get_mut(&fd).unwrap(); }
}
一旦连接和它的状态就绪,我们可以用和之前一样的方法来推进它。从流中读写数据的操作没有任何变化,区别是现在我们仅在接到 epoll 通知时才进行操作。
以前我们必须检查每一个连接,看看是否有什么变得就绪,但现在由 epoll 来处理,避免了任何无用的系统调用。
let (connection, state) = connections.get_mut(&fd).unwrap();
if let ConnectionState::Read { request, read } = state {
*state = ConnectionState::Write { response, written };
}
if let ConnectionState::Write { response, written } = state {
*state = ConnectionState::Flush;
}
if let ConnectionState::Flush = state {
}
所有操作都完成后,我们从 connections 中移除当前连接,它会自动从 epoll 中注销。
for fd in completed {
let (connection, _state) = connections.remove(&fd).unwrap();
drop(connection);
}
就是现在,更高水平的 Web 服务完成了!
fn main() {
let epoll = epoll::create(false).unwrap();
let listener = .
let event = Event::new(Events::EPOLLIN, listener.as_raw_fd() as _);
epoll::ctl(epoll, EPOLL_CTL_ADD, listener.as_raw_fd(), event).unwrap();
let mut connections = HashMap::new();
loop {
let mut events = [Event::new(Events::empty(), 0); 1024];
let num_events = epoll::wait(epoll, 0, &mut events).unwrap();
let mut completed = Vec::new();
'next: for event in &events[..num_events] {
let fd = event.data as i32;
if fd == listener.as_raw_fd() {
match listener.accept() {
Ok((connection, _)) => {
let event = Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as _);
epoll::ctl(epoll, EPOLL_CTL_ADD, fd, event).unwrap();
let state = ConnectionState::Read {
request: [0u8; 1024],
read: 0,
};
connections.insert(fd, (connection, state));
}
continue 'next;
}
let (connection, state) = connections.get_mut(&fd).unwrap();
if let ConnectionState::Read { request, read } = state {
*state = ConnectionState::Write {
response: response.as_bytes(),
written: 0,
};
}
if let ConnectionState::Write { response, written } = state {
*state = ConnectionState::Flush;
}
if let ConnectionState::Flush = state {
}
}
for fd in completed {
let (connection, _state) = connections.remove(&fd).unwrap();
drop(connection);
}
}
}
现在……
$ curl localhost:3000
工作正常!
Futures
好吧,我们的服务器现在能在单一线程中同时处理多个请求了。感谢 epoll,它在我们的工作场景中非常高效。但是,仍然还存在问题。
我们需要自己规划任务的执行,需要自己考虑如何高效地调度任务,这使得我们的代码复杂度急剧增加。
任务的执行,也从一个简单的顺序执行循环变成了庞大的事件循环,需要管理多个状态机。
总感觉差点意思。
使我们的原始服务器成多线程非常简单,只需在 thread::spawn 中添加一行代码即可。仔细想想,我们的服务器仍然是一组并发任务,只是我们在一个巨大的循环中混乱地管理它们。
这让扩展功能变得非常困难,在程序中添加的功能越多,循环就变得越复杂,因为所有东西都紧密地耦合在一起。如果可以编写一个类似 thread::spawn 的抽象,能让我们将任务写成独立的单元,能集中在一个地方处理所有任务的调度和事件处理,从而重新获得流程控制权,会怎么样呢?
这种思想被称为异步编程。
我们来看看 thread::spawn 的函数签名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static;
thread::spawn 接受一个闭包,但我们的版本其实并不能,因为我们不是操作系统,不能随意抢占代码。我们需要以某种方式来表达一项不受阻碍、可恢复的任务。
trait Task {}
处理一个请求是一个任务,从连接读取或写入数据也是。一个任务实质上是一段待执行的代码,代表着它将在未来某个时候需要得到执行。
Future (译注:本意是未来,在异步中我们保留原文代表这种 trait),确实是个好名字,不是吗?
trait Future {
type Output;
fn run(self) -> Self::Output;
}
这个签名并不能工作。run 函数直接返回 Self::Output 意味着它会阻塞直到返回,而这正是我们极力在避免的。我们要寻找其他方法,实现不阻塞的同时,能推动我们的 future 前进,就像我们之前在事件循环的状态机中实现的那样。
实际上,在执行一个 future 的时候,我们要做的就是询问它是否已就绪,轮询( polling ) 它,然后给它执行的机会。
trait Future {
type Output;
fn poll(self) -> Option<Self::Output>;
}
看起来差不多了。
但是,如果我们多次调用 poll,除了等着,我们并不能获取 self, 所以它应该是一个引用,一个可变的引用,通过它,我们可以改变任务内部的状态,比如 ConnectState。
trait Future {
type Output;
fn poll(&mut self) -> Option<Self::Output>;
}
现在,来设想一下执行这些 future 的调度器:
impl Scheduler {
fn run(&self) {
loop {
for future in &self.tasks {
future.poll();
}
}
}
}
这看起来不怎么样。
future 初始化完成后,当 epoll 返回的它的事件时,调度器调用它的 poll 方法来给它一个执行的机会。
如果 future 是 I/O 操作,在 接到 epoll 通知时我们就知道它可以执行了。问题是我们不知道 epoll 事件对应的是哪个 future, 因为 future 的执行过程都在内部的 poll 中。
调度器需要传递一个 ID 给 future,它可以用这个 ID 而不是文件描述符向 epoll 注册任何 I/O 资源。通过这种方式,调度器就能把 epoll 事件和 future 对应起来了。
impl Scheduler {
fn spawn<T>(&self, mut future: T) {
let id = rand();
future.poll(event.id);
self.tasks.insert(id, future);
}
fn run(self) {
for event in epoll_events {
let future = self.tasks.get(&event.id).unwrap();
future.poll(event.id);
}
}
}
您知道的,如果有一种更通用的方式来告诉调度器 future 当前的进度,而不是把每个 future 都绑定到 epoll,那就太好了。future 有不同的类型,每种类型都可能有不同的执行方式,比如在后台线程中执行的定时器、或者是一个通道,它需要在消息已就绪的时候通知相应的任务。
如果我们给 future 更多的控制权呢? 如果我们不是简单地用一个 ID, 而是给每个 future 一个能唤醒自己的方法,能通知调度器它已经准备好可以执行了呢?
一个简单的回调函数就可以做到。
#[derive(Clone)]
struct Waker(Arc<dyn Fn() + Send + Sync>);
impl Waker {
fn wake(&self) {
(self.0)()
}
}
trait Future {
type Output;
fn poll(&mut self, waker: Waker) -> Option<Self::Output>;
}
调度器可以为每个 future 提供一个回调函数,它被调用时更新该 future 在调度器中的状态,标记 future 为就绪。这样调度器就完全和 epoll 或其他任何独立通知系统解耦了。
唤醒器 ( Waker ) 是线程安全的,允许我们使用后台线程唤醒 future。目前所有的任务都已连接到 epoll,这马上就会派上用场了。
执行器
考虑一个 future 从 TCP 连接读取数据的场景: 它收到一个唤醒器, 需要在 epoll 返回给它 EPOLLIN 事件的时候得到执行,但事件发生时,它只会呆在调度器的队列里面,并不会得到执行。显然,future 不能唤醒自己,我们还需要其他帮助。
所有 I/O 类型的 future 都需要将他们的唤醒器传递给 epoll, 实际上,它们需要的不止这些, 他们还需要一种驱动 epoll 的后台服务,以便我们可以在其中注册唤醒器。
这种服务通常被称为反应器( Reactor )。
像之前一样,反应器只是一个简单的对象,它保存了 epoll 描述符和以这个描述符为 键值的任务列表。不同之处是键值对应的值不是已建立的 TCP 连接,而是唤醒器。
thread_local! {
static REACTOR: Reactor = Reactor::new();
}
struct Reactor {
epoll: RawFd,
tasks: RefCell<HashMap<RawFd, Waker>>,
}
impl Reactor {
pub fn new() -> Reactor {
Reactor {
epoll: epoll::create(false).unwrap(),
tasks: RefCell::new(HashMap::new()),
}
}
}
简单起见,反应器只是一个本地线程( thread-local )对象,通过 RefCell 获得内部可变性。这非常重要,因为反应器会被不同的任务逻辑修改。
反应器需要实现一系列的基本操作:
添加任务:
impl Reactor {
pub fn add(&self, fd: RawFd, waker: Waker) {
let event = epoll::Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as u64);
epoll::ctl(self.epoll, EPOLL_CTL_ADD, fd, event).unwrap();
self.tasks.borrow_mut().insert(fd, waker);
}
}
移除任务:
impl Reactor {
pub fn remove(&self, fd: RawFd) {
self.tasks.borrow_mut().remove(&fd);
}
}
并且驱动 epoll。
就像 epoll 在 loop 中执行一样,反应器也在 loop 中执行。它们几乎以相同的方式工作,反应器要做的是为每一个事件唤醒相应的 future,然后继续下一个循环。被唤醒的 future 在稍后将在调度器中执行。
impl Reactor {
pub fn wait(&self) {
let mut events = [Event::new(Events::empty(), 0); 1024];
let timeout = -1; let num_events = epoll::wait(self.epoll, timeout, &mut events).unwrap();
for event in &events[..num_events] {
let fd = event.data as i32;
if let Some(waker) = self.tasks.borrow().get(&fd) {
waker.wake();
}
}
}
}
很好,现在我们有一个简单的反应器接口了。
但所有的这些仍然有点抽象,调用 wake 方法究竟意味着什么呢?
任务的调度
反应器有了,我们还需要一个任务调度器来执行我们的任务。
需要记住的是,任务调度器必须是全局,并且是线程安全的,因为唤醒程序是 Send 的,这意味着 wake 方法可以从其他线程同时调用。
static SCHEDULER: Scheduler = Scheduler { };
#[derive(Default)]
struct Scheduler {
}
我们希望能像创建线程那样在调度器上创建任务。目前,我们将只生成不返回任何内容的任务,以避免必须实现 JoinHandle。
首先,我们需要按一定的顺序排列要执行的任务,用 Mutex 来保证线程安全。
struct Scheduler {
tasks: Mutex<Vec<Box<dyn Future + Send>>>
}
impl Scheduler {
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
self.tasks.lock().unwrap().push(Box::new(task));
}
pub fn run(&self) {
for task in tasks.lock().unwrap().borrow_mut().iter_mut() {
}
}
}
记住,future 只在它可以推进的时候才会被轮询。它们在创建时总是会推进一次,然后直到 wake 方法被调用才会被唤醒。
实现方法有很多,我们可以在任务列表中存储一个标记,表示任务是否已被唤醒,但这意味着必须遍历任务列表才能找到可执行的任务,这代价太大了,肯定有更好的方式。
我们可以在队列中只保存可执行的任务,而不是所有被创建的。
use std::collections::VecDeque;
type SharedTask = Arc<Mutex<dyn Future<Output = ()> + Send>>;
#[derive(Default)]
struct Scheduler {
runnable: Mutex<VecDeque<SharedTask>>,
}
这些类型很快就会有意义。
当任务被创建后,它将会被加入到队尾:
impl Scheduler {
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
self.runnable.lock().unwrap().push_back(Arc::new(Mutex::new(task)));
}
}
调度器挨个弹出任务并调用它们的 poll 方法:
impl Scheduler {
fn run(&self) {
loop {
let task = self.runnable.lock().unwrap().pop_front();
if let Some(task) = task {
task.try_lock().unwrap().poll(waker);
}
}
}
请注意,我们甚至不需要互斥锁来锁定任务,因为任务只会由主线程访问,但删除它意味着不安全,使用 try_lock().unwrap() 来处理。
现在,最重要的一点:唤醒器。我们的执行队列最精华的部分就是:当一个任务被唤醒,它只是简单地被推回队列。
impl Scheduler {
fn run(&self) {
loop {
let task = self.runnable.lock().unwrap().pop_front();
if let Some(task) = task {
let t2 = task.clone();
let wake = Arc::new(move || {
SCHEDULER.runnable.lock().unwrap().push_back(t2.clone());
});
task.try_lock().unwrap().poll(Waker(wake));
}
}
}
}
这就是为什么需要对任务进行引用计数——它的所有权不属于调度程序,它被队列引用,也被存储唤醒程序的任何地方引用。实际上同一个任务可能会加入队列多次,唤醒器也可能在任何地方被克隆。
一旦处理完了所有可执行的任务,反应器将会被阻塞,直到有其他任务就绪。当新的任务变得就绪,反应器将调用 wake 方法并将该 future 推入任务队列并执行它,继续循环。
pub fn run(&self) {
loop {
loop {
let Some(task) = self.runnable.lock().unwrap().pop_front() else { break };
let t2 = task.clone();
let wake = Arc::new(move || {
SCHEDULER.runnable.lock().unwrap().push_back(t2.clone());
});
task.lock().unwrap().poll(Waker(wake));
}
REACTOR.with(|reactor| reactor.wait()); }
}
漂亮!
……忽略混乱的 Arc<Mutex>。 好!调度器和反应器共同构成了一个 future 的运行时。调度器会跟踪哪些任务是可运行的,并轮询它们,当 epoll 告诉我们它们感兴趣的内容准备就绪时,反应器会将任务标记为可运行。
trait Future {
type Output;
fn poll(&mut self, waker: Waker) -> Option<Self::Output>;
}
static SCHEDULER: Scheduler = Scheduler { };
#[derive(Default)]
struct Scheduler {
runnable: Mutex<VecDeque<SharedTask>>,
}
type SharedTask = Arc<Mutex<dyn Future<Output = ()> + Send>>;
impl Scheduler {
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static);
pub fn run(&self);
}
thread_local! {
static REACTOR: Reactor = Reactor::new();
}
struct Reactor {
epoll: RawFd,
tasks: RefCell<HashMap<RawFd, Waker>>,
}
impl Reactor {
pub fn new() -> Reactor;
pub fn add(&self, fd: RawFd, waker: Waker);
pub fn remove(&self, fd: RawFd);
pub fn wait(&self);
}
我们已经写好了运行时,下面尝试使用它。
异步的 Web 服务
是时候来写供我们的调度器执行的任务了。跟之前一样,我们使用枚举作为状态机来管理程序的状态。不同的是,这次不再在一个巨大的循环中统一管理程序状态,而是每个任务独立管理自己的状态。
一开始,我们需要编写主任务,它将掌控整个程序的调度器可执行队列的开关。
fn main() {
SCHEDULER.spawn(Main::Start);
SCHEDULER.run();
}
enum Main {
Start,
}
impl Future for Main {
type Output = ();
fn poll(&mut self, waker: Waker) -> Option<()> {
}
}
任务的开始跟之前一样,创建一个 TCP 侦听器并设置为非阻塞模式。
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();
}
None
}
然后需要在 epoll 中注册侦听器,使用我们的反应器来实现它。
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
REACTOR.with(|reactor| {
reactor.add(listener.as_raw_fd(), waker);
});
}
}
请注意由调度器提供的唤醒器是如何给到反应器的,当新的连接进来,epoll 将返回一个事件,反应器将唤醒相关任务,引起调度器将任务重新放入队列并再次轮询它。唤醒器将所有环节都连接起来了。
现在,当任务下一次被执行时,我们需要第二种状态: Accept。主任务将一直保持在 Accept 状态,保证新连接进来能及时得到处理。
enum Main {
Start,
Accept { listener: TcpListener }, }
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
*self = Main::Accept { listener };
}
if let Main::Accept { listener } = self {
match listener.accept() {
Ok((connection, _)) => {
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
return None;
}
Err(e) => panic!("{e}"),
}
}
None
}
如果侦听器未就绪,我们返回 None 即可。这告诉调度器 future 还没有就绪,等下次反应器唤醒它时再调度。
如果我们接受新的连接,同样需要将其设置为非阻塞模式。
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
}
if let Main::Accept { listener } = self {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap(); }
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
Err(e) => panic!("{e}"),
}
}
None
}
接下来,需要创建新的任务来执行传入的请求。
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
}
if let Main::Accept { listener } = self {
match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
SCHEDULER.spawn(Handler { connection,
state: HandlerState::Start,
});
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
Err(e) => panic!("{e}"),
}
}
}
请求处理程序跟之前的差不多,但现在它自己管理连接及当前状态,与之前的 ConnectionState 完全相同。
struct Handler {
connection: TcpStream,
state: HandlerState,
}
enum HandlerState {
Start,
Read {
request: [u8; 1024],
read: usize,
},
Write {
response: &'static [u8],
written: usize,
},
Flush,
}
任务处理程序首先在反应器注册当前连接,以便在连接准备好读/写时收到通知。同样,通过唤醒器让调度器知道何时再次调度它。
impl Future for Handler {
type Output = ();
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let HandlerState::Start = self.state {
REACTOR.with(|reactor| {
reactor.add(self.connection.as_raw_fd(), waker);
});
self.state = HandlerState::Read {
request: [0u8; 1024],
read: 0,
};
}
}
}
读、写和刷新状态跟之前完全一样,只是当遇到 WouldBlock 时,我们仅返回 None 即可。因为我们知道后续被唤醒时就会再次得到执行的机会。
/ impl Future for Handler {
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let HandlerState::Start = self.state {
}
if let HandlerState::Read { request, read } = &mut self.state {
loop {
match self.connection.read(&mut request[*read..]) {
Ok(0) => {
println!("client disconnected unexpectedly");
return Some(());
}
Ok(n) => *read += n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None, Err(e) => panic!("{e}"),
}
let read = *read;
if read >= 4 && &request[read - 4..read] == b"\r\n\r\n" {
break;
}
}
let request = String::from_utf8_lossy(&request[..*read]);
println!("{}", request);
let response = ;
self.state = HandlerState::Write {
response: response.as_bytes(),
written: 0,
};
}
if let HandlerState::Write { response, written } = &mut self.state {
self.state = HandlerState::Flush;
}
if let HandlerState::Flush = self.state {
match self.connection.flush() {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None, Err(e) => panic!("{e}"),
}
}
}
注意到了吗?当任务是独立、封装的对象时,事情会变得多美好!
任务生命周期结束时,它从反应器中移除自己的连接并返回 Some,此后它将不再被执行。
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
REACTOR.with(|reactor| {
reactor.remove(self.connection.as_raw_fd());
});
Some(())
}
完美!我们的 Web 服务端越来越好了。任务之间完全独立,我们可以像创建线程那样创建任务。
fn main() {
SCHEDULER.spawn(Main::Start);
SCHEDULER.run();
}
enum Main {
Start,
Accept { listener: TcpListener },
}
impl Future for Main {
type Output = ();
fn poll(&mut self, waker: Waker) -> Option<()> {
if let Main::Start = self {
REACTOR.with(|reactor| {
reactor.add(listener.as_raw_fd(), waker);
});
*self = Main::Accept { listener };
}
if let Main::Accept { listener } = self {
match listener.accept() {
Ok((connection, _)) => {
SCHEDULER.spawn(Handler {
connection,
state: HandlerState::Start,
});
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
Err(e) => panic!("{e}"),
}
}
None
}
}
struct Handler {
connection: TcpStream,
state: HandlerState,
}
enum HandlerState {
Start,
Read {
request: [u8; 1024],
read: usize,
},
Write {
response: &'static [u8],
written: usize,
},
Flush,
}
impl Future for Handler {
type Output = ();
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let HandlerState::Start = self.state {
REACTOR.with(|reactor| {
reactor.add(self.connection.as_raw_fd(), waker);
});
self.state = HandlerState::Read { };
}
if let HandlerState::Read { request, read } = &mut self.state {
self.state = HandlerState::Write { };
}
if let HandlerState::Write { response, written } = &mut self.state {
self.state = HandlerState::Flush;
}
if let HandlerState::Flush = self.state {
}
REACTOR.with(|reactor| {
reactor.remove(self.connection.as_raw_fd());
});
Some(())
}
}
执行结果:
$ curl localhost:3000
工作正常!
全功能的 Web 服务
得益于新的 future 抽象,我们的服务端比之前更优秀。Future 独立管理自己的状态,调度器不需要关心 epoll, 专注于执行任务。任务可以被创建、唤醒,不需要关心调度器的底层细节。这真的是非常好的编程模式。
任务被封装是件好事,但我们仍然必须以类似状态机的方式编写所有内容,诚然,Rust 使这很容易用枚举来完成,但我们能做得更好吗?
看看我们写的两个 future,它们有很多共同点:每个 future 都有多个状态;在每种状态下都会运行一些代码;如果这些代码成功执行完毕,我们将转换到下一状态;如果遇到 WouldBlock 则返回 None,表示尚未准备就绪。
这似乎是可以抽象的东西。
我们需要从代码块中创建 future,以及将两个 future 组合在一起的方法。给定一个代码块,我们需要能够构建一个 future ……这听起来跟闭包很相似?
fn future_fn(f: F) -> impl Future
where
F: Fn(),
{
}
还需要能修改内部的状态
fn future_fn(f: F) -> impl Future
where
F: FnMut(),
{
}
还需要传入一个唤醒器
fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
F: FnMut(Waker) -> Option<T>,
{
}
还… 需要一个返回值,未就绪时它返回 None。实际上,我们只需要复制 poll 函数的签名,它已经满足了闭包的要求。
fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
F: FnMut(Waker) -> Option<T>,
{
}
实现 poll_fn 貌似不难,我们只需要包装一个实现了 Future 的结构并委托 poll 函数给闭包。
fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
F: FnMut(Waker) -> Option<T>,
{
struct FromFn<F>(F);
impl<F, T> Future for FromFn<F>
where
F: FnMut(Waker) -> Option<T>,
{
type Output = T;
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
(self.0)(waker)
}
}
FromFn(f)
}
好吧。让我们使用新的 poll_fn 帮助函数来尝试重写主任务,将 Main::Start 状态的代码粘贴到闭包中即可。
fn main() {
SCHEDULER.spawn(listen());
SCHEDULER.run();
}
fn listen() -> impl Future<Output = ()> {
let start = poll_fn(|waker| {
let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();
REACTOR.with(|reactor| {
reactor.add(listener.as_raw_fd(), waker);
});
Some(listener)
});
}
请记住,Main::Start 从不等待任何 I/O,因此它会立即准备好侦听器。
同样,我们使用 poll_fn 帮助函数来重写 Main::Accept future。
fn listen() -> impl Future<Output = ()> {
let start = poll_fn(|waker| {
Some(listener)
});
let accept = poll_fn(|_| match listener.accept() {
Ok((connection, _)) => {
connection.set_nonblocking(true).unwrap();
SCHEDULER.spawn(Handler {
connection,
state: HandlerState::Start,
});
None
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
Err(e) => panic!("{e}"),
});
}
另外,accept 总是返回 None,因为我们希望每次有新连接传入时都能调用它。它贯穿于我们的整个程序。
现在我们有了两个任务状态,需要以某种方式将它们连接起来。
fn chain<F1, F2>(future1: F1, future2: F2) -> impl Future<Output = F2::Output>
where
F1: Future,
F2: Future
{
}
嗯~这其实不能工作。
第二个 future 需要访问第一个的输出:TCP 侦听器。
直接使用行不通,考虑在第一个 future 的输出上接一个闭包,它可以使用第一个 future 的输出来构建第二个。
fn chain<T1, F, T2>(future1: T1, chain: F) -> impl Future<Output = T2::Output>
where
T1: Future,
F: FnOnce(T1::Output) -> T2,
T2: Future
{
}
看上去好些了。
我们不妨再花哨一点,让 chain 成为 Future trait 的方法。这样,我们就可以在任何 future 调用 .chain 作为后缀方法了。
trait Future {
fn chain<F, T>(self, chain: F) -> Chain<Self, F, T>
where
F: FnOnce(Self::Output) -> T,
T: Future,
Self: Sized,
{
}
}
enum Chain<T1, F, T2> {
}
看起来不错,我们试试看。
Chain future 是状态机的泛化,所以它本身就是一个迷你状态机。它首先轮询第一个 future,然后在过渡闭包完成后保留住它。
enum Chain<T1, F, T2> {
First { future1: T1, transition: F },
}
impl<T1, F, T2> Future for Chain<T1, F, T2>
where
T1: Future,
F: FnOnce(T1::Output) -> T2,
T2: Future,
{
type Output = T2::Output;
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let Chain::First { future1, transition } = self {
match future1.poll(waker) {
}
}
}
}
一旦第一个 future 执行完毕,它使用过渡闭包构造第二个 future,并轮询它:
enum Chain<T1, F, T2> {
First { future1: T1, transition: F },
Second { future2: T2 },
}
impl<T1, F, T2> Future for Chain<T1, F, T2>
where
T1: Future,
F: FnOnce(T1::Output) -> T2,
T2: Future,
{
type Output = T2::Output;
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let Chain::First { future1, transition } = self {
match future1.poll(waker.clone()) {
Some(value) => {
let future2 = (transition)(value); *self = Chain::Second { future2 };
}
None => return None,
}
}
if let Chain::Second { future2 } = self {
return future2.poll(waker); }
None
}
}
请注意,同一个唤醒器是如何用于轮询两个 future 的。取决于两个 future 的状态,通知将仅传播到 Chain 的父 future。
嗯……这实际上是不能工作的:
error[E0507]: cannot move out of `*transition` which is behind a mutable reference
--> src/main.rs:182:33
|
182 | let future2 = (transition)(value);
| ^^^^^^^^^^^^ move occurs because `*transition` has type `F`,
which does not implement the `Copy` trait
哦,是的,transition 是一个 FnOnce 的闭包,它在第一次被调用时就被消耗掉了。虽然基于我们的状态机,我们只调用它一次,但编译器并不知道这一点。我们可以将其封装在 Option 中,并使用 take 来获取它的同时将 Option 替换为 None,从而获取它的所有权。这是使用状态机时常见的模式。
enum Chain<T1, F, T2> {
First { future1: T1, transition: Option<F> }, Second { future2: T2 },
}
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
if let Chain::First { future1, transition } = self {
match future1.poll(waker.clone()) {
Some(value) => {
let future2 = (transition.take().unwrap())(value); *self = Chain::future2 { future2 };
}
None => return None,
}
}
}
完美!现在,在其初始状态下,chain 方法就可以很轻松地构建我们的 Chain Future 了。
trait Future {
fn chain<F, T>(self, transition: F) -> Chain<Self, F, T>
where
F: FnOnce(Self::Output) -> T,
T: Future,
Self: Sized,
{
Chain::First {
future1: self,
transition: Some(transition),
}
}
}
好吧,我们说到哪儿了……对了,是主 future!
fn listen() -> impl Future<Output = ()> {
let start = poll_fn(|waker| {
Some(listener)
});
let accept = poll_fn(|_| match listener.accept() {
});
}
我们可以用新的 chain 方法组合这两个 future。
fn listen() -> impl Future<Output = ()> {
poll_fn(|waker| {
Some(listener)
})
.chain(|listener| { poll_fn(move |_| match listener.accept() {
})
})
}
嗯,看起来很不错!我们的手动状态机已经不复存在了,侦听方法现在用简单的闭包就可以表示!
fn main() {
SCHEDULER.spawn(listen());
SCHEDULER.run();
}
fn listen() -> impl Future<Output = ()> {
poll_fn(|waker| {
let listener = TcpListener::bind("localhost:3000").unwrap();
REACTOR.with(|reactor| {
reactor.add(listener.as_raw_fd(), waker);
});
Some(listener)
})
.chain(|listener| {
poll_fn(move |_| match listener.accept() {
Ok((connection, _)) => {
SCHEDULER.spawn(Handler {
connection,
state: HandlerState::Start,
});
None
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
Err(e) => panic!("{e}"),
})
})
}
真是太好了!
接下来,用同样的方式,我们可以将连接处理的部分也转换成基于闭包的 future。首先我们将它分离出来,形成一个返回闭包的函数。
fn listen() -> impl Future<Output = ()> {
poll_fn(|waker| {
})
.chain(|listener| {
poll_fn(move |_| match listener.accept() {
Ok((connection, _)) => {
SCHEDULER.spawn(handle(connection)); None
}
})
})
}
fn handle(connection: TcpStream) -> impl Future<Output = ()> {
}
最开始的状态 HandlerState::Start 只是一个简单的 poll_fn 闭包,它向反应器注册连接并立即返回。
fn handle(connection: TcpStream) -> impl Future<Output = ()> {
poll_fn(move |waker| {
REACTOR.with(|reactor| {
reactor.add(connection.as_raw_fd(), waker);
});
Some(())
})
}
第二个状态 HandlerState::Read 可以很容易地通过 chain 来组合 future,它在栈上初始化一个本地连接状态并将它移动到 future 中,让 future 管理自身的状态。
fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
poll_fn(move |waker| {
})
.chain(move |_| {
let mut read = 0;
let mut request = [0u8; 1024];
poll_fn(move |_| {
loop {
match connection.read(&mut request[read..]) {
Ok(0) => {
println!("client disconnected unexpectedly");
return Some(());
}
Ok(n) => read += n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
Err(e) => panic!("{e}"),
}
let read = read;
if read >= 4 && &request[read - 4..read] == b"\r\n\r\n" {
break;
}
}
let request = String::from_utf8_lossy(&request[..read]);
println!("{request}");
Some(())
})
})
}
HandlerState::Write 和 HandlerState::Flush 可以用同样的方式组合各自的任务。
fn handle(connection: TcpStream) -> impl Future<Output = ()> {
poll_fn(move |waker| {
})
.chain(move |_| {
})
.chain(move |_| {
let response = ;
let mut written = 0;
poll_fn(move |_| {
loop {
match connection.write(response[written..].as_bytes()) {
Ok(0) => {
println!("client disconnected unexpectedly");
return Some(());
}
Ok(n) => written += n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
Err(e) => panic!("{e}"),
}
if written == response.len() {
break;
}
}
Some(())
})
})
.chain(move |_| {
poll_fn(move |_| {
match connection.flush() {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
return None;
}
Err(e) => panic!("{e}"),
};
REACTOR.with(|reactor| reactor.remove(connection.as_raw_fd());
Some(())
})
})
}
漂亮!
啊?!……
error[E0382]: use of moved value: `connection`
--> src/main.rs:59:12
|
51 | fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
| -------------- move occurs because `connection` has type `TcpStream`, which does not implement the `Copy` trait
52 | poll_fn(move |waker| {
| ------------- value moved into closure here
53 | REACTOR.with(|reactor| {
54 | reactor.add(connection.as_raw_fd(), waker);
| ---------- variable moved due to use in closure
...
59 | .chain(move |_| {
| ^^^^^^^^ value used here after move
...
66 | match connection.read(&mut request[read..]) {
| ---------- use occurs due to use in closure
error[E0382]: use of moved value: `connection`
唔……
所有的 future 都使用了 move 来传递参数给闭包,意味着闭包获取了 connection 的所有权,每个 connection 的所有权是唯一的,不应该移动进闭包,那么去掉 move 试试看?
error[E0373]: closure may outlive the current function, but it borrows `connection`, which is owned by the current function
--> src/main.rs:52:13
|
52 | poll_fn(|waker| {
| ^^^^^^^^ may outlive borrowed value `connection`
53 | REACTOR.with(|reactor| {
54 | reactor.add(connection.as_raw_fd(), waker);
| ---------- `connection` is borrowed here
|
note: closure is returned here
--> src/main.rs:52:5
|
52 | / poll_fn(|waker| {
53 | | REACTOR.with(|reactor| {
54 | | reactor.add(connection.as_raw_fd(), waker);
55 | | });
... |
128 | | })
129 | | })
| |______^
help: to force the closure to take ownership of `connection` (and any other referenced variables), use the `move` keyword
|
52 | poll_fn(move |waker| {
| ++++
看上去也不行。连接的生命周期需要足够长。如果我们将它移动到第一个 future ,让其他 future 借用它呢?
error[E0382]: use of moved value: `connection`
--> src/main.rs:59:12
|
51 | fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
| -------------- move occurs because `connection` has type `TcpStream`, which does not implement the `Copy` trait
52 | poll_fn(move |waker| {
| ------------- value moved into closure here
53 | REACTOR.with(|reactor| {
54 | reactor.add(connection.as_raw_fd(), waker);
| ---------- variable moved due to use in closure
...
59 | .chain(|_| {
| ^^^ value used here after move
...
66 | match connection.read(&mut request[read..]) {
| ---------- use occurs due to use in closure
果然,还是不行。
本质上,我们的链式 future 是这样的:第一个 future 拥有链接,其他 future 从它借用。
enum Handle {
Start {
connection: TcpStream,
}
Read {
connection: &'??? TcpStream
}
}
Which of course, doesn’t make much sense. ( 继续这种方式没有太大的意义?这里不知道怎么翻译符合语境,也不影响理解。 ) 一旦状态切换到 Read, Start 中的连接就被释放了,我们的引用就没了目标。
所以手动写 future 时该怎么做才可以呢?
struct Handler {
connection: TcpStream,
state: HandlerState,
}
enum HandlerState { }
好了,现在连接存在于外部结构中,也许我们可以编写另一个帮助程序,允许 future 引用存储在外部的一些数据?
就像这样:
struct WithData<D, F> {
data: D,
future: F,
}
看上去很简单, 构造一个 futrue,它可以捕获数据的引用,并且通过闭包来构造它们的关联,就像 chain 的实现一样。
impl<D, F> WithData<D, F> {
pub fn new(data: D, construct: impl Fn(&D) -> F) -> WithData<D, F> {
let future = construct(&data);
WithData { data, future }
}
}
WithData 可以通过委托内部 future 来实现自身的 Future 特性。
impl<D, F> Future for WithData<D, F>
where
F: Future,
{
type Output = F::Output;
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
self.future.poll(waker)
}
}
现在我们可以用 WithData 封装我们的 future, 让 connection 在 future 返回之后仍然存活,这样应该就能正常了。
fn handle(connection: TcpStream) -> impl Future<Output = ()> {
WithData::new(connection, |connection| {
from_fn(move |waker| {
})
.chain(move |_| {
let mut request = [0u8; 1024];
let mut read = 0;
from_fn(move |_| {
})
})
.chain(move |_| {
let response = ;
let mut written = 0;
from_fn(move |_| {
})
})
.chain(move |_| {
from_fn(move |_| {
})
})
})
}
虽然看起来有的奇怪,但它能工作的话……
error: lifetime may not live long enough
--> src/main.rs:53:9
|
52 | WithData::new(connection, |connection| {
| ----------- return type of closure `Chain<Chain<Chain<impl Future<Output = ()>, [closure@src/bin/play.rs:60:16: 86:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:87:16: 113:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:114:16: 130:10], impl Future<Output = ()>>` contains a lifetime `'2`
| |
| has type `&'1 TcpStream`
53 | / from_fn(move |waker| {
54 | | REACTOR.with(|reactor| {
55 | | reactor.add(connection.as_raw_fd(), waker);
56 | | });
... |
129 | | })
130 | | })
| |__________^ returning this value requires that `'1` must outlive `'2`
看起来不是轻易能搞定的。
这又是一条非常诡异的错误信息:
return type of closure `Chain<Chain<Chain<impl Future<Output = ()>, [closure@src/bin/play.rs:60:16: 86:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:87:16: 113: 10], impl Future<Output = ()>>, [closure@src/bin/play.rs:114:16: 130:10], impl Future<Output = ()>>` contains a lifetime `'2`
好吧,传递给 WithData 的巨型链式 future 包含了对 connection 的引用。
那正是我们想要的,为了其他 future 能借用 connection,对吧?
`connection` has type `&'1 TcpStream` ... returning this value requires that `'1` must outlive `'2`
嗯~ 在 WithData 结构中,我们没有实际指定 future 是从 data 借用数据,导致 Rust 不能推断出它的生命周期。那么,是不是给 WithData 标注上正确的生命周期就可以了呢?
struct WithData<