通过迭代WebServer逐步深入Rust异步编程

  • 2024年 1月2日
  • 读完约需 114 分钟
  • • 
  • 标签: 
  • rust
  • async
  • 最后更新于 2024年 5月31日

原文链接

我发现,理解一个新概念的最好方法之一是从头开始:从零起步,一点点地创建它,不仅要学习它是如何工作的,还要学习它为什么会这样设计。

这不是一篇实用的异步指南,希望它涵盖的背景知识能帮助您思考异步问题,或至少满足您的好奇心,而且不会因为太多细节让您觉得厌烦。

但它依然很长……

目录

最简单的 Web 服务

我们从最简单的Web服务开始,进入异步编程之旅。我们的Web服务只使用 std::net crate 中的标准网络类型,只接受 HTTP 请求并用基本响应进行回复。接下来的内容中,我们将尽量忽略 HTTP 规范相关要求、忽略代码实用性,将焦点集中在 Web 服务的基本流程上。

HTTP 是构建于 TCP 之上,基于文本的传输协议。我们的 Web 服务通过建立一个TcpListener 来接受客户端的 TCP 连接请求:

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("localhost:3000").unwrap();
}

Web 服务相应来自客户端传入的连接请求,按顺序挨个执行它们。

use std::net::{TcpListener, TcpStream};
use std::io;

fn main() {
    // ...

    loop {
        let (connection, _) = listener.accept().unwrap();

        if let Err(e) = handle_connection(connection) {
            println!("failed to handle connection: {e}")
        }
    }
}

fn handle_connection(connection: TcpStream) -> io::Result<()> {
    // ...
}

TCP 连接是 Web 服务与客户端之间的双向数据通道,用 TcpStream 类型表示。它实现了 Read 和 Write 特性( trait ),抽象了 Tcp 内部细节,使我们能够读取或写入普通的字节数据。

Web 服务接受 HTTP 请求,为此,需要创建缓存数组来保存传入的请求。

fn handle_connection(connection: TcpStream) -> io::Result<()> {
    let mut request = [0u8; 1024];
    // ...

    Ok(())
}

接下来就是从连接读取数据,写入缓存。读取动作每次读到的字节数不定,所以需要跟踪已读取字节数。读取动作在循环中不断重复,依次将读到的内容写入缓存。

use std::io::Read;

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut read = 0;
    let mut request = [0u8; 1024];

    loop {
        // 尝试从流中读取数据
        let num_bytes = connection.read(&mut request[read..])?;

        // 跟踪已读取的字节数
        read += num_bytes;
    }

    Ok(())
}

直到读到一个特殊的字符序列 \r\n\r\n,我们约定用它来作为读取的结束标志。

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut read = 0;
    let mut request = [0u8; 1024];

    loop {
        // 尝试从流中读取数据
        let num_bytes = connection.read(&mut request[read..])?;

        // 跟踪已读取的字节数
        read += num_bytes;

        // 判断是否读到结束标记
        if request.get(read - 4..read) == Some(b"\r\n\r\n") {
            break;
        }
    }

    Ok(())
}

还有一种情况:如果连接已断开,读到的字节数将是0,此时如果客户端的请求未发送完毕,直接返回转入处理下一个连接。

再次声明,不用担心 HTTP 规范限制,我们的目的是让 Web 服务能工作就行。

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut read = 0;
    let mut request = [0u8; 1024];

    loop {
        // 尝试从流中读取数据
        let num_bytes = connection.read(&mut request[read..])?;

        // 客户端已断开
        if num_bytes == 0 { // 👈
            println!("client disconnected unexpectedly");
            return Ok(());
        }

        // 跟踪已读取的字节数
        read += num_bytes;

        // 判断是否读到结束标记
        if request.get(read - 4..read) == Some(b"\r\n\r\n") {
            break;
        }
    }

    Ok(())
}

一旦读取完成,就可以将读到的结果转换为字符串,并以日志形式输出到控制台。

fn handle_connection(stream: TcpStream) -> io::Result<()> {
    let mut read = 0;
    let mut request = [0u8; 1024];

    loop {
        // ...
    }

    let request = String::from_utf8_lossy(&request[..read]);
    println!("{request}");

    Ok(())
}

接下来实现响应部分。

跟读操作类似,写操作也可能不会一次完成。所以同样需要采用循环的方式,不断写入响应数据到流中,每次写入从上一次结束的位置开始,直到写入全部完成。

use std::io::Write;

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    // ...

    // 生成HTTP的响应数据
    let response = concat!(
        "HTTP/1.1 200 OK\r\n",
        "Content-Length: 12\n",
        "Connection: close\r\n\r\n",
        "Hello world!"
    );

    let mut written = 0;

    loop {
        // 持续写入响应到流
        let num_bytes = connection.write(response[written..].as_bytes())?;

        // 客户端已断开
        if num_bytes == 0 {
            println!("client disconnected unexpectedly");
            return Ok(());
        }

        written += num_bytes;

        // 判断响应内容是否已经完整写入流
        if written == response.len() {
            break;
        }
    }
}

最后,我们执行刷新( flush )操作,确保写入操作已执行完毕1

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut written = 0;
    
    loop {
        // ...
    }

    // 刷新响应
    connection.flush()
}

好了,现在我们有了一个能工作的 Web 服务!

$ curl localhost:3000
# => Hello world!

多线程的 Web 服务

好吧,它虽然可以工作,但是有一点问题。

看看我们接受请求的循环:

let listener = TcpListener::bind("localhost:3000").unwrap();

loop {
    let (connection, _) = listener.accept().unwrap();

    if let Err(e) = handle_connection(connection) {
        // ...
    }
}

看到问题了吗?

我们一次只能响应一个请求。

从一个网络连接读写数据不是即时完成的,中间要经过大量的基础设备,比如网关、路由器等等,如果两个用户同时向我们发起请求会发生什么?十个、十万个呢?随用户规模增长,我们的服务会延迟、卡顿,直至不可用,那么如何改进呢?

可能的选择有好几种,但到目前为止,最简单的是线程的方式。为每个请求创建一个线程,就能我们的服务能响应无限的用户增长,对吧?

fn main() {
    // ...
    loop {
        let (connection, _) = listener.accept().unwrap();

        // 为每个传入请求创建一个线程
        std::thread::spawn(|| {
            if let Err(e) = handle_connection(connection) {
                // ...
            }
        });
    }
}

实际上,这很有可能!就算不是无限的,但随着每个请求都在单独的线程中处理,我们服务的吞吐量会显著增加。

这到底是怎么回事?

跟大多数现代操作系统一样,在 linux 中,程序都是在一个单独的进程中运行的。虽然看起来每个活动程序都是同时运行的,但在物理上,一个 CPU 内核一次只能执行一个任务,或者通过超线程技术同时执行两个。为了让所有的程序都能执行,操作系统内核会不断切换它们,暂停当前正在运行的程序,切换到另一个并运行它,如此往复。这些上下文切换以毫秒为单位发生,形成了感觉上的“并行”。

内核调度器通过在多个内核之间分配工作负载来利用多核。每个核心管理一部分进程2,这意味着某些程序可以真正意义上得到并行运行。

 cpu1 cpu2 cpu3 cpu4
|----|----|----|----|
| p1 | p3 | p5 | p7 |
|    |____|    |____|
|    |    |____|    |
|____| p4 |    | p8 |
|    |    | p6 |____|
| p2 |____|    |    |
|    | p3 |    | p7 |
|    |    |    |    |

这种调度类型被称为抢占式多任务调度:内核决定进程运行多长时间被抢占,切换到其他进程。

该模式下,内核确保各独立进程不会访问到其他进程的内存,从而保证各种类型的程序都能得到良好地运行。但是,这使得上下文切换更加昂贵,因为内核在执行上下文切换之前必须刷新内存的某些部分,以确保内存被正确隔离3

线程跟进程类似4,区别是线程可以与同一父进程下的线程共享内存,从而实现在同一程序的线程之间共享状态。除此之外线程的调度和进程没有任何区别。

对我们的服务而言,1 线程 / 1 请求 模式最关键的问题是我们的服务是 I/O 绑定的。handle_connection 执行过程中中绝大部分时间并不是用于计算,而是用于等待,等待从网络连接中收发数据,等待读、写、刷新等 I/O 阻塞的操作执行完毕。我们希望的是,发送一个 I/O 请求后,让出控制权给内核,等操作完成后内核再将控制权交回。在此期间,内核可以执行其他程序。

通常情况下,处理一个网络请求时绝大部分时间都在等待其他任务完成,比如数据库查询或接收 HTTP 请求。多个工作线程效率高的原因是我们可以利用等待的时间来处理其他请求。

非阻塞的 Web 服务

看起来多线程已经完全满足我们的需求,并且它使用也很简单,那么为什么我们还要继续呢?

您也许听说过线程很“重”、上下文切换非常“昂贵”等说法,但是现在,这并不准确,现代的服务器能毫不费力地处理上万的线程。

问题在于阻塞 I/O 将程序的控制权完全交给了操作系统内核,在程序执行完成之前,我们没有任何的干预手段可用,这让我们实现某些操作变得非常困难,比如取消操作和选择操作。

假设我们要实现优雅的服务关停操作。当我们按下 ctrl+c,程序不会马上退出,而是立刻停止接受新的连接请求,当前已建立连接的任务会继续执行,直到完成,或者是 30 秒后被强行终止,最后服务才退出。

在阻塞 I/O 模式下,这里的问题是:我们的 accept 循环会阻塞,直到下一个连接到来。我们可以在新连接请求被接受之前或之后检查 ctrl+c 信号,如果在处理 accept 时信号进来,我们必须等待下一次连接被接受,这期间只有内核拥有程序完全的控制权。

loop {
    // 调用accept之前检查ctrl+c信号
    if got_ctrl_c() {
        break;
    }

    // **如果ctrl+c在这里发生,会出现什么情况?**
    let (connection, _) = listener.accept().unwrap();

    // 在新的连接被接受之前,这不会被调用
    if got_ctrl_c() {
        break;
    }

    std::thread::spawn(|| /* ... */);
}

我们想要的是像 match 操作一样,针对 I/O,同时侦听连接请求和 ctrl+c 信号:

loop {
    // 类似这样...
    match {
        ctrl_c() => {
            break;
        },
        Ok((connection, _)) = listener.accept() => {
            std::thread::spawn(|| ...);
        }
    }
}

对于运行时间超过 30 秒的任务,又该怎么处理呢?我们可以设置一个标记让线程停止,那么又该多久检测一次标记呢?我们又回到了老问题:因为 I/O 阻塞导致我们丧失了程序的控制权,除了等它执行完毕,没有好的方式来强制取消一个线程的执行。

这正是线程和阻塞 I/O 令人头疼的地方,因为应用程序的控制权完全交给了内核,导致实现基于事件的逻辑变得非常困难。

某些平台下,可以使用平台特定接口来实现这一点,比如Unix信号处理机制。虽然信号处理机制简单,并且在某些场景下工作得很好,但在场景变得复杂的时候,信号处理机制会变得非常繁琐。在本文末尾,我们描述了另一种表达复杂控制流的方法。您可以根据实际情况来挑选合适的方式。

那么,有没有既能执行 I/O,又不用出让控制权给内核的实现方法呢?

实际上,还有另一种实现 I/O 操作的方法,称为非阻塞 I/O( non-bloking I/O )。顾名思义,非阻塞操作永远不会阻塞调用线程,它会立即返回,如果给定的资源不可用,则返回一个错误。

通过将 TCP 侦听器和连接置于非阻塞模式,我们可以切换到非阻塞 I/O 的实现方式。

let listener = TcpListener::bind("localhost:3000").unwrap();
listener.set_nonblocking(true).unwrap();

loop {
    let (connection, _) = listener.accept().unwrap();
    connection.set_nonblocking(true).unwrap();

    // ...
}

非阻塞 I/O 的工作模式有一些不同:如果 I/O 请求不能立即完成,内核将返回一个 WouldBlock 错误代码。尽管被表示为错误代码,但 WouldBlock 并不是真正的错误,它只是意味着当前操作无法立即执行完毕,让我们可以自行决定接下来要做什么。

use std::io;

// ...
listener.set_nonblocking(true).unwrap();

loop {
    let connection = match listener.accept() {
        Ok((connection, _)) => connection,
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
            // 操作还不能执行
            // ...
        }
        Err(e) => panic!("{e}"),
    };

    connection.set_nonblocking(true).unwrap();
    // ...
}

假设在调用 accept() 之后没有连接请求进来,在阻塞 I/O 模式下,我们只能一直等待新的连接,但现在,WouldBlock 不是将控制权交给内核,而是交回我们手里。

我们的 I/O 终于不阻塞了!但此时我们能做点什么呢?

WouldBlock 是一个临时的状态,意味着在未来某个时刻,当前套接字会准备好用于读或写。所以从技术上讲,我们应该一直等到(作者用了自旋这个单词-spin until)套接字状态变成可用( ready )。

loop {
    let connection = match listener.accept() {
        Ok((connection, _)) => connection,
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
            continue; // 👈
        }
        Err(e) => panic!("{e}"),
    };
}

但是自旋还不如阻塞,至少阻塞 I/O 模式下,操作系统还可以给其他线程执行的机会。所以我们真正需要的,是为全部任务创建一个有序的调度器,来完成曾经由操作系统来为我们做的事情。

让我们从头回顾一遍:

首先我们创建了一个 TCP 侦听器:

let listener = TcpListener::bind("localhost:3000").unwrap();

然后设置它为非阻塞模式:

listener.set_nonblocking(true).unwrap();

接下来进入主循环,循环中第一件事情是接受一个新的 TCP 连接请求。

// ...

loop {
    match listener.accept() {
        Ok((connection, _)) => {
            connection.set_nonblocking(true).unwrap();

            // ...
        },
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
        Err(e) => panic!("{e}"),
    }
}

现在,我们不能继续直接为已建立的连接服务,导致其他请求被忽略。我们必须能跟踪所有的活动连接。

// ...

let mut connections = Vec::new(); // 👈

loop {
    match listener.accept() {
        Ok((connection, _)) => {
            connection.set_nonblocking(true).unwrap();
            connections.push(connection); // 👈
        },
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
        Err(e) => panic!("{e}"),
    }
}

但是我们不能无休止地接受连接请求。当没有操作系统调度的便利时,我们需要在主循环的每一次迭代中,将所有的事情都推进一点点。一旦新的连接请求被接受,我们需要处理所有的活跃连接。

对于每一个连接,我们必须执行任何需要的操作来推进请求的处理,无论是读取请求还是写入响应。

// ...
loop {
    // 尝试接受新的连接请求
    match listener.accept() {
        // ...
    }

    // 针对活跃的连接进行处理
    for connection in connections.iter_mut() {
        // 🤔
    }
}

还记得之前的 handle_connection 功能吗?

fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut request = [0u8; 1024];
    let mut read = 0;

    loop {
        let num_bytes = connection.read(&mut request[read..])?;  // 👈
        // ...
    }

    let request = String::from_utf8_lossy(&request[..read]);
    println!("{request}");

    let response = /* ... */;
    let mut written = 0;

    loop {
        let num_bytes = connection.write(&response[written..])?; // 👈

        // ...
    }

    connection.flush().unwrap(); // 👈
}

我们需要执行不同的 I/O 操作,比如读、写和刷新。阻塞模式下,代码会按我们写的顺序执行。但现在我们必须面对这样一个事实,在执行 I/O 的任何时候都可能面临WouldBlock,导致当前执行无法取得进展。

同时,我们不能简单地丢掉这个结果去处理下一个活动连接,我们需要跟踪当前连接的状态,方便在下次回来时能从正确的地方继续。

我们设计了一个枚举来保存 handle_connetion 的状态,它有三种可能的状态:

enum ConnectionState {
    Read,
    Write,
    Flush
}

请记住,我们需要的不是记录事务单独的状态,例如将请求转换为字符串,我们需要的是在遇到 WouldBlock 时,能记住当时的状态。

读、写操作的状态还包含当前已读写的字节数和本地缓存。之前我们在函数中定义它,现在我们需要它在整个主循环的生命周期中存在。

enum ConnectionState {
    Read {
        request: [u8; 1024],
        read: usize
    },
    Write {
        response: &'static [u8],
        written: usize,
    },
    Flush,
}

我们在每一次 handle_connection 开始执行时初始化连接状态为 Read,request 为 0 值,read 为 0 字节。

// ...

let mut connections = Vec::new();

loop {
    match listener.accept() {
        Ok((connection, _)) => {
            connection.set_nonblocking(true).unwrap();


            let state = ConnectionState::Read { // 👈
                request: [0u8; 1024],
                read: 0,
            };

            connections.push((connection, state));
        },
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
        Err(e) => panic!("{e}"),
    }
}

现在,我们可以尝试根据其当前状态,将每个连接向前推进了。

// ...
loop {
    match listener.accept() {
        // ...
    }

    for (connection, state) in connections.iter_mut() {
        if let ConnectionState::Read { request, read } = state {
            // ...
        }

        if let ConnectionState::Write { response, written } = state {
            // ...
        }

        if let ConnectionState::Flush = state {
            // ...
        }
    }
}

如果当前连接仍然处于 Read 状态,继续做读取操作,唯一不同的是,如果收到WouldBlock, 则继续处理下一个活动连接。

// ...

'next: for (connection, state) in connections.iter_mut() {
    if let ConnectionState::Read { request, read } = state {
        loop {
            // 尝试从流中读取数据
            match connection.read(&mut request[*read..]) {
                Ok(n) => {
                    // 跟踪已读取的字节数
                    *read += n
                }
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // 当前连接的操作还未就绪,继续处理下一个连接
                    continue 'next; // 👈
                }
                Err(e) => panic!("{e}"),
            }

            // 判断是否读到结束标记
            if request.get(*read - 4..*read) == Some(b"\r\n\r\n") {
                break;
            }
        }

        // 操作完成,打印收到的请求数据
        let request = String::from_utf8_lossy(&request[..*read]);
        println!("{request}");
    }

    // ...
}

还有读到 0 字节的问题需要处理,之前我们只是从 handle_connection 中退出,state 变量会自动被清空。但是现在,我们必须自己处理当前连接。当前我们正在遍历connections 列表,所以需要一个单独的列表来收集已完成的活动连接,后续再来处理。

let mut completed = Vec::new(); // 👈

'next: for (i, (connection, state)) in connections.iter_mut().enumerate() {
    if let ConnectionState::Read { request, read } = state {
        loop {
            // 尝试从流中读取数据
            match connection.read(&mut request[*read..]) {
                Ok(0) => {
                    println!("client disconnected unexpectedly");
                    completed.push(i); // 👈
                    continue 'next;
                }
                Ok(n) => *read += n,
                // 当前连接未准备好,先处理下一个活动连接
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue 'next,
                Err(e) => panic!("{e}"),
            }

            // ...
        }

        // ...
    }
}

// 按相反顺序迭代以保留索引
for i in completed.into_iter().rev() {
    connections.remove(i); // 👈
}

读操作完成后,我们必须切换到 Write 状态并尝试写入回应。写操作的逻辑跟读操作非常相似,写操作完成后,需要切换到 Flush 状态。

if let ConnectionState::Read { request, read } = state {
    // ...

    // 切换到写状态
    let response = concat!(
        "HTTP/1.1 200 OK\r\n",
        "Content-Length: 12\n",
        "Connection: close\r\n\r\n",
        "Hello world!"
    );

    *state = ConnectionState::Write { // 👈
        response: response.as_bytes(),
        written: 0,
    };
}

if let ConnectionState::Write { response, written } = state {
    loop {
        match connection.write(&response[*written..]) {
            Ok(0) => {
                println!("client disconnected unexpectedly");
                completed.push(i);
                continue 'next;
            }
            Ok(n) => {
                *written += n;
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                // 当前连接的操作还未就绪,继续处理下一个连接
                continue 'next;
            }
            Err(e) => panic!("{e}"),
        }

        // 判断响应数据是否已全部写入完毕
        if *written == response.len() {
            break;
        }
    }

    // 写操作完成,进入 Flush 状态
    *state = ConnectionState::Flush;
}

成功完成刷新操作后,我们标记当前连接为完成,并从 completed 列表中移除。

if let ConnectionState::Flush = state {
    match connection.flush() {
        Ok(_) => {
            completed.push(i); // 👈
        },
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
            // 当前连接的操作还未就绪,继续处理下一个连接
            continue 'next;
        }
        Err(e) => panic!("{e}"),
    }
}

就是这样!以下是新的更高水平的 web 服务流程:

fn main() {
    // 绑定侦听器
    let listener = TcpListener::bind("localhost:3000").unwrap();
    listener.set_nonblocking(true).unwrap();

    let mut connections = Vec::new();

    loop {
        // 尝试接受一个连接请求
        match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap();

                // 跟踪连接状态
                let state = ConnectionState::Read {
                    request: Vec::new(),
                    read: 0,
                };

                connections.push((connection, state));
            },
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
            Err(e) => panic!("{e}"),
        }

        let mut completed = Vec::new();

        // 尝试驱动活动连接向前进展
        'next: for (i, (connection, state)) in connections.iter_mut().enumerate() {
            if let ConnectionState::Read { request, read } = state {
                // ...
                *state = ConnectionState::Write { response, written };
            }

            if let ConnectionState::Write { response, written } = state {
                // ...
                *state = ConnectionState::Flush;
            }

            if let ConnectionState::Flush = state {
                // ...
            }
        }

        // 保持索引不变,反序遍历 completed 列表,删除已完成操作的连接
        for i in completed.into_iter().rev() {
            connections.remove(i);
        }
    }
}

现在,我们必须自己管理调度,事情变得越来越复杂了……

关键的时候来了……

$ curl localhost:3000
# => Hello world!

工作正常!

多路复用的 Web 服务

现在我们的 Web 服务能在单线程中执行多个请求,没有任何阻塞。如果某个操作被阻塞,它将记住自己的状态并切换,让其他操作执行,这跟内核调度器的行为一致。但是,新设计带来了两个问题.

首先是所有的工作都在主线程中运行,只利用了一个 CPU 核心。我们尽最大努力高效地利用这一核心,但一次仍然只执行一个任务。如果线程能分布在多个核心上,我们同一时间就可以做更多的工作。

不过有一个更大的问题。

我们的主循环效率并不高。

我们对每一个活跃的连接,每一次循环的迭代,都要向内核发出一个 I/O 请求,来检查它是否准备好了。即使调用 read 或 write 返回了 WouldBlock,实际没有执行任何 I/O,它仍然是一个系统调用。系统调用并不便宜。我们可能有 10k 个活跃的连接,但只有 500 个是准备好的。当只有 500 个连接会真正做些事情的时候,调用 read 或 write 10k 次是对 CPU 周期的巨大浪费。

随着连接数的增加,我们的循环变得越来越低效,浪费了更多的时间做无用的工作。

怎么解决这个问题呢?使用阻塞 I/O 时,内核能够有效地调度任务,因为它知道资源什么时候准备好了。使用非阻塞 I/O 时,我们不检查就不知道,但是检查是很昂贵的。

我们需要的是一种高效的方式来跟踪所有的活跃连接,并且在它们准备好的时候得到通知。

事实证明,我们并不是第一个遇到这个问题的人。每个操作系统都提供了针对这个问题的解决方案。在 Linux 上,它叫做 epoll。

epoll(7) - I/O 事件通知机制

epoll API 执行的任务与 poll(2) 类似:监视多个文件描述符,看看是否有任何一个可以进行 I/O 操作。epoll API 可以作为边缘触发(edge-triggered)或水平触发(level-triggered)的接口使用,并且能够很好地扩展到监视大量的文件描述符。

听上去很完美!我们试试看。

epoll 是一组 Linux 系统调用,让我们可以处理一组非阻塞的套接字。直接使用 epoll 并不是很方便,所以我们将使用 epoll crate,它是一个对 C 接口的轻度封装。

首先,我们使用 create 函数来初始化一个 epoll 实例。

// ```toml
// [dependencies]
// epoll = "4.3"
// ```

fn main() {
    let epoll = epoll::create(false).unwrap(); // 👈
}

epoll::create 返回一个文件描述符,它代表了新创建的 epoll 实例。你可以把它看作是一个文件描述符的集合,我们可以从中添加或删除文件描述符。

在 Linux/Unix 中,一切都被视为文件。文件系统上的实际文件、TCP 套接字、以及外部设备都是可以读写的文件。文件描述符是一个整数,它表示系统中打开的“文件”。本文接下来的部分,我们将频繁使用它。

我们要添加的第一个文件描述符是 TCP 监听器。可以用 epoll::ctl 命令来修改 epoll 集合,添加文件描述符使用EPOLL_CTL_ADD标志。

use epoll::{Event, Events, ControlOptions::*};
use std::os::fd::AsRawFd;

fn main() {
    let listener = TcpListener::bind("localhost:3000").unwrap();
    listener.set_nonblocking(true).unwrap();

    // 添加 listener 到 epoll
    let event = Event::new(Events::EPOLLIN, listener.as_raw_fd() as _);
    epoll::ctl(epoll, EPOLL_CTL_ADD, listener.as_raw_fd(), event).unwrap(); // 👈
}

我们传入要注册的资源的文件描述符,也就是 TCP 监听器,以及一个事件。一个事件有两个部分,interest flag 和 data field。interest flag 让我们可以告诉 epoll 我们感兴趣的 I/O 事件。在 TCP 监听器中,我们想要在有新连接进来时得到通知,所以传入 EPOLLIN 标志。

data field 让我们可以存储一个能够唯一标识每个资源的 ID。记住,文件描述符是一个给定文件的唯一整数,所以直接使用它。你会在下一步看到为什么这很重要。

现在轮到主循环。这次不用自旋,用 epoll::wait。

epoll_wait(2) - 等待 epoll 文件描述符上的 I/O 事件

对文件描述符 epfd 指向的 epoll(7) 实例而言,epoll_wait() 系统调用会等待其上的事件。interest list 中的文件描述符指向的 ready list 中,如果有一些可用事件的信息,那么这些信息通过 events 指向的缓冲区返回。

调用 epoll_wait() 将阻塞,直到以下任一情况发生:

  • 文件描述符提交了一个事件;
  • 调用被信号处理器中断;
  • 超时;

epoll::wait 是 epoll 的神奇之处:它阻塞直到我们注册的任何事件变得就绪,并告诉我们哪些事件就绪了。此时这仅用于有新连接进来时,但是很快我们将使用它来阻塞读、写和刷新事件,这些事件我们之前是用自旋的方式处理的。

您可能不喜欢 epoll::wait 是“阻塞”的这一事实,但是,它只在没有任何事情可做的时候才阻塞,而之前我们是在自旋并且做无用的系统调用。这种同时阻塞多个操作的方法被称为 I/O 多路复用

epoll::wait 接受一个事件的列表,当所关注的文件描述符就绪,它会将文件描述符的信息填充到列表,然后返回被添加的事件的数量。

// ...
loop {
    let mut events = [Event::new(Events::empty(), 0); 1024];
    let timeout = -1; // 阻塞,直到某些事情发生
    let num_events = epoll::wait(epoll, timeout, &mut events).unwrap(); // 👈

    for event in &events[..num_events] {
        // ...
    }
}

每个事件都包含数据字段,该字段与就绪的资源相关联。

for event in &events[..num_events] {
    let fd = event.data as i32;
    // ...
}

还记得我们用文件描述符来标记数据字段吗?我们可以用它来检查事件是否是针对TCP监听器的,如果是,那就意味着有一个传入连接已经准备好被接受了。

for event in &events[..num_events] {
    let fd = event.data as i32;

    // listener准备就绪了吗?
    if fd == listener.as_raw_fd() {
        // 尝试建立一个连接
        match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap();

                // ...
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
            Err(e) => panic!("{e}"),
        }
    }
}

如果返回 WouldBlock, 则移动到下一个连接,等待下一次事件发生。

现在需要在 epoll 中注册新的连接,跟注册侦听器一样。

for event in &events[..num_events] {
    let fd = event.data as i32;

    // listener准备就绪了吗?
    if fd == listener.as_raw_fd() {
        // 尝试建立一个连接
        match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap();
                 let fd = connection.as_raw_fd();

                 // 注册新连接到epoll
                 let event = Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as _);
                 epoll::ctl(epoll, EPOLL_CTL_ADD, fd, event).unwrap(); // 👈
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
            Err(e) => panic!("{e}"),
        }
    }
}

这次我们注册了EPOLLIN 和 EPOLLOUT 事件,因为根据连接状态,我们要关注读或写事件。

注册了连接之后,我们将得到 TCP 侦听器和某个连接的事件。我们需要用某种方式存储连接和它们的状态,并能通过查找文件描述符的方式来访问它们。

这次不用 List,用 HashMap。

let mut connections = HashMap::new();

loop {
    // ...
    'next: for event in &events[..num_events] {
        let fd = event.data as i32;

        // listener准备就绪了吗?
        if fd == listener.as_raw_fd() {
            match listener.accept() {
                Ok((connection, _)) => {
                    // ...

                    let state = ConnectionState::Read {
                        request: [0u8; 1024],
                        read: 0,
                    };

                    connections.insert(fd, (connection, state)); // 👈
                }
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
                Err(e) => panic!("{e}"),
            }

            continue 'next;
        }

        // listener未就绪的话,必须有连接是就绪的
        let (connection, state) = connections.get_mut(&fd).unwrap(); // 👈
    }
}

一旦连接和它的状态就绪,我们可以用和之前一样的方法来推进它。从流中读写数据的操作没有任何变化,区别是现在我们仅在接到 epoll 通知时才进行操作。

以前我们必须检查每一个连接,看看是否有什么变得就绪,但现在由 epoll 来处理,避免了任何无用的系统调用。

// ...

// epoll告诉我们连接是否就绪
let (connection, state) = connections.get_mut(&fd).unwrap();

if let ConnectionState::Read { request, read } = state {
    // connection.read...
    *state = ConnectionState::Write { response, written };
}

if let ConnectionState::Write { response, written } = state {
    // connection.write...
    *state = ConnectionState::Flush;
}

if let ConnectionState::Flush = state {
    // connection.flush...
}

所有操作都完成后,我们从 connections 中移除当前连接,它会自动从 epoll 中注销。

for fd in completed {
    let (connection, _state) = connections.remove(&fd).unwrap();
    // 会自动从epoll注销
    drop(connection);
}

就是现在,更高水平的 Web 服务完成了!

fn main() {
    // 创建 epoll
    let epoll = epoll::create(false).unwrap();

    // 绑定到listener
    let listener = /* ... */.

    // 添加 listener 到 epoll
    let event = Event::new(Events::EPOLLIN, listener.as_raw_fd() as _);
    epoll::ctl(epoll, EPOLL_CTL_ADD, listener.as_raw_fd(), event).unwrap();

    let mut connections = HashMap::new();

    loop {
        let mut events = [Event::new(Events::empty(), 0); 1024];

        // 阻塞,直到被 epoll 唤醒
        let num_events = epoll::wait(epoll, 0, &mut events).unwrap();
        let mut completed = Vec::new();

        'next: for event in &events[..num_events] {
            let fd = event.data as i32;

            // listener 是否就绪?
            if fd == listener.as_raw_fd() {
                match listener.accept() {
                    Ok((connection, _)) => {
                        // ...

                        // 添加 connection 到 epoll
                        let event = Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as _);
                        epoll::ctl(epoll, EPOLL_CTL_ADD, fd, event).unwrap();

                        // 跟踪 connection 的状态
                        let state = ConnectionState::Read {
                            request: [0u8; 1024],
                            read: 0,
                        };

                        connections.insert(fd, (connection, state));
                }

                continue 'next;
            }

            // connection 变得就绪了
            let (connection, state) = connections.get_mut(&fd).unwrap();

            // 尝试基于状态推动当前 connection 的执行
            if let ConnectionState::Read { request, read } = state {
                // connection.read...
                *state = ConnectionState::Write {
                    response: response.as_bytes(),
                    written: 0,
                };
            }

            if let ConnectionState::Write { response, written } = state {
                // connection.write...
                *state = ConnectionState::Flush;
            }

            if let ConnectionState::Flush = state {
                // connection.flush...
            }
        }

        for fd in completed {
            let (connection, _state) = connections.remove(&fd).unwrap();
            // connection 会自动从 epoll 注销
            drop(connection);
        }
    }
}

现在……

$ curl localhost:3000
# => Hello world!

工作正常!

Futures

好吧,我们的服务器现在能在单一线程中同时处理多个请求了。感谢 epoll,它在我们的工作场景中非常高效。但是,仍然还存在问题。

我们需要自己规划任务的执行,需要自己考虑如何高效地调度任务,这使得我们的代码复杂度急剧增加。

任务的执行,也从一个简单的顺序执行循环变成了庞大的事件循环,需要管理多个状态机。

总感觉差点意思。

使我们的原始服务器成多线程非常简单,只需在 thread::spawn 中添加一行代码即可。仔细想想,我们的服务器仍然是一组并发任务,只是我们在一个巨大的循环中混乱地管理它们。

这让扩展功能变得非常困难,在程序中添加的功能越多,循环就变得越复杂,因为所有东西都紧密地耦合在一起。如果可以编写一个类似 thread::spawn 的抽象,能让我们将任务写成独立的单元,能集中在一个地方处理所有任务的调度和事件处理,从而重新获得流程控制权,会怎么样呢?

这种思想被称为异步编程。

我们来看看 thread::spawn 的函数签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static;

thread::spawn 接受一个闭包,但我们的版本其实并不能,因为我们不是操作系统,不能随意抢占代码。我们需要以某种方式来表达一项不受阻碍、可恢复的任务。

// fn spawn<T: Task>(task: T);

trait Task {}

处理一个请求是一个任务,从连接读取或写入数据也是。一个任务实质上是一段待执行的代码,代表着它将在未来某个时候需要得到执行。

Future (译注:本意是未来,在异步中我们保留原文代表这种 trait),确实是个好名字,不是吗?

trait Future {
    type Output;

    fn run(self) -> Self::Output;
}

这个签名并不能工作。run 函数直接返回 Self::Output 意味着它会阻塞直到返回,而这正是我们极力在避免的。我们要寻找其他方法,实现不阻塞的同时,能推动我们的 future 前进,就像我们之前在事件循环的状态机中实现的那样。

实际上,在执行一个 future 的时候,我们要做的就是询问它是否已就绪,轮询( polling ) 它,然后给它执行的机会。

trait Future {
    type Output;

    fn poll(self) -> Option<Self::Output>;
}

看起来差不多了。

但是,如果我们多次调用 poll,除了等着,我们并不能获取 self, 所以它应该是一个引用,一个可变的引用,通过它,我们可以改变任务内部的状态,比如 ConnectState。

trait Future {
    type Output;

    fn poll(&mut self) -> Option<Self::Output>;
}

现在,来设想一下执行这些 future 的调度器:

impl Scheduler {
    fn run(&self) {
        loop {
            for future in &self.tasks {
                future.poll();
            }
        }
    }
}

这看起来不怎么样。

future 初始化完成后,当 epoll 返回的它的事件时,调度器调用它的 poll 方法来给它一个执行的机会。

如果 future 是 I/O 操作,在 接到 epoll 通知时我们就知道它可以执行了。问题是我们不知道 epoll 事件对应的是哪个 future, 因为 future 的执行过程都在内部的 poll 中。

调度器需要传递一个 ID 给 future,它可以用这个 ID 而不是文件描述符向 epoll 注册任何 I/O 资源。通过这种方式,调度器就能把 epoll 事件和 future 对应起来了。

impl Scheduler {
    fn spawn<T>(&self, mut future: T) {
        let id = rand();
        // 对 future 调用一次轮询让它运转起来,传入的参数是它的 ID
        future.poll(event.id);
        // 保存 future
        self.tasks.insert(id, future);
    }

    fn run(self) {
        // ...

        for event in epoll_events {
            // 根据事件 ID 轮询相应的 future
            let future = self.tasks.get(&event.id).unwrap();
            future.poll(event.id);
        }
    }
}

您知道的,如果有一种更通用的方式来告诉调度器 future 当前的进度,而不是把每个 future 都绑定到 epoll,那就太好了。future 有不同的类型,每种类型都可能有不同的执行方式,比如在后台线程中执行的定时器、或者是一个通道,它需要在消息已就绪的时候通知相应的任务。

如果我们给 future 更多的控制权呢? 如果我们不是简单地用一个 ID, 而是给每个 future 一个能唤醒自己的方法,能通知调度器它已经准备好可以执行了呢?

一个简单的回调函数就可以做到。

#[derive(Clone)]
struct Waker(Arc<dyn Fn() + Send + Sync>);

impl Waker {
    fn wake(&self) {
        (self.0)()
    }
}

trait Future {
    type Output;

    fn poll(&mut self, waker: Waker) -> Option<Self::Output>;
}

调度器可以为每个 future 提供一个回调函数,它被调用时更新该 future 在调度器中的状态,标记 future 为就绪。这样调度器就完全和 epoll 或其他任何独立通知系统解耦了。

唤醒器 ( Waker ) 是线程安全的,允许我们使用后台线程唤醒 future。目前所有的任务都已连接到 epoll,这马上就会派上用场了。

执行器

考虑一个 future 从 TCP 连接读取数据的场景: 它收到一个唤醒器, 需要在 epoll 返回给它 EPOLLIN 事件的时候得到执行,但事件发生时,它只会呆在调度器的队列里面,并不会得到执行。显然,future 不能唤醒自己,我们还需要其他帮助。

所有 I/O 类型的 future 都需要将他们的唤醒器传递给 epoll, 实际上,它们需要的不止这些, 他们还需要一种驱动 epoll 的后台服务,以便我们可以在其中注册唤醒器。

这种服务通常被称为反应器( Reactor )。

像之前一样,反应器只是一个简单的对象,它保存了 epoll 描述符和以这个描述符为 键值的任务列表。不同之处是键值对应的值不是已建立的 TCP 连接,而是唤醒器。

thread_local! {
    static REACTOR: Reactor = Reactor::new();
}

struct Reactor {
    epoll: RawFd,
    tasks: RefCell<HashMap<RawFd, Waker>>,
}

impl Reactor {
    pub fn new() -> Reactor {
        Reactor {
            epoll: epoll::create(false).unwrap(),
            tasks: RefCell::new(HashMap::new()),
        }
    }
}

简单起见,反应器只是一个本地线程( thread-local )对象,通过 RefCell 获得内部可变性。这非常重要,因为反应器会被不同的任务逻辑修改。

反应器需要实现一系列的基本操作:

添加任务:

impl Reactor {
    // 添加一个关注读和写的事件描述符
    //
    // 当事件被触发时`waker` 将会被调用
    pub fn add(&self, fd: RawFd, waker: Waker) {
        let event = epoll::Event::new(Events::EPOLLIN | Events::EPOLLOUT, fd as u64);
        epoll::ctl(self.epoll, EPOLL_CTL_ADD, fd, event).unwrap();
        self.tasks.borrow_mut().insert(fd, waker);
    }
}

移除任务:

impl Reactor {
    // 从 epoll 移除指定的描述符
    //
    // 移除后任务将不再得到该通知
    pub fn remove(&self, fd: RawFd) {
        self.tasks.borrow_mut().remove(&fd);
    }
}

并且驱动 epoll。

就像 epoll 在 loop 中执行一样,反应器也在 loop 中执行。它们几乎以相同的方式工作,反应器要做的是为每一个事件唤醒相应的 future,然后继续下一个循环。被唤醒的 future 在稍后将在调度器中执行。

impl Reactor {
    // 驱动任务前进,然后一直阻塞,直到有事件到达。
    pub fn wait(&self) {
       let mut events = [Event::new(Events::empty(), 0); 1024];
       let timeout = -1; // 永不超时
       let num_events = epoll::wait(self.epoll, timeout, &mut events).unwrap();

       for event in &events[..num_events] {
           let fd = event.data as i32;

           // 唤醒任务
           if let Some(waker) = self.tasks.borrow().get(&fd) {
               waker.wake();
           }
       }
    }
}

很好,现在我们有一个简单的反应器接口了。

但所有的这些仍然有点抽象,调用 wake 方法究竟意味着什么呢?

任务的调度

反应器有了,我们还需要一个任务调度器来执行我们的任务。

需要记住的是,任务调度器必须是全局,并且是线程安全的,因为唤醒程序是 Send 的,这意味着 wake 方法可以从其他线程同时调用。

static SCHEDULER: Scheduler = Scheduler { /* ... */ };

#[derive(Default)]
struct Scheduler {
    // ...
}

我们希望能像创建线程那样在调度器上创建任务。目前,我们将只生成不返回任何内容的任务,以避免必须实现 JoinHandle。

首先,我们需要按一定的顺序排列要执行的任务,用 Mutex 来保证线程安全。

struct Scheduler {
    tasks: Mutex<Vec<Box<dyn Future + Send>>>
}

impl Scheduler {
    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
        self.tasks.lock().unwrap().push(Box::new(task));
    }

    pub fn run(&self) {
        for task in tasks.lock().unwrap().borrow_mut().iter_mut() {
            // ...
        }
    }
}

记住,future 只在它可以推进的时候才会被轮询。它们在创建时总是会推进一次,然后直到 wake 方法被调用才会被唤醒。

实现方法有很多,我们可以在任务列表中存储一个标记,表示任务是否已被唤醒,但这意味着必须遍历任务列表才能找到可执行的任务,这代价太大了,肯定有更好的方式。

我们可以在队列中只保存可执行的任务,而不是所有被创建的。

use std::collections::VecDeque;

type SharedTask = Arc<Mutex<dyn Future<Output = ()> + Send>>;

#[derive(Default)]
struct Scheduler {
    runnable: Mutex<VecDeque<SharedTask>>,
}

这些类型很快就会有意义。

当任务被创建后,它将会被加入到队尾:

impl Scheduler {
    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
        self.runnable.lock().unwrap().push_back(Arc::new(Mutex::new(task)));
    }
}

调度器挨个弹出任务并调用它们的 poll 方法:

impl Scheduler {
    fn run(&self) {
        loop {
            // 从队列中弹出一个可执行的任务
            let task = self.runnable.lock().unwrap().pop_front();

            if let Some(task) = task {
                // 调用它的 poll 方法
                task.try_lock().unwrap().poll(waker);
            }
        }
    }

请注意,我们甚至不需要互斥锁来锁定任务,因为任务只会由主线程访问,但删除它意味着不安全,使用 try_lock().unwrap() 来处理。

现在,最重要的一点:唤醒器。我们的执行队列最精华的部分就是:当一个任务被唤醒,它只是简单地被推回队列。

impl Scheduler {
    fn run(&self) {
        loop {
            // 从队列中弹出一个可执行的任务
            let task = self.runnable.lock().unwrap().pop_front();

            if let Some(task) = task {
                let t2 = task.clone();

                // 创建一个唤醒器,它的作用是把任务推回队列
                let wake = Arc::new(move || {
                    SCHEDULER.runnable.lock().unwrap().push_back(t2.clone());
                });

                // 调用该任务的 poll 方法
                task.try_lock().unwrap().poll(Waker(wake));
            }
        }
    }
}

这就是为什么需要对任务进行引用计数——它的所有权不属于调度程序,它被队列引用,也被存储唤醒程序的任何地方引用。实际上同一个任务可能会加入队列多次,唤醒器也可能在任何地方被克隆。

一旦处理完了所有可执行的任务,反应器将会被阻塞,直到有其他任务就绪5。当新的任务变得就绪,反应器将调用 wake 方法并将该 future 推入任务队列并执行它,继续循环。

pub fn run(&self) {
    loop {
        loop {
            // 从队列中弹出一个可执行的任务
            let Some(task) = self.runnable.lock().unwrap().pop_front() else { break };
            let t2 = task.clone();

            // 创建一个唤醒器,它的作用是把任务推回队列
            let wake = Arc::new(move || {
                SCHEDULER.runnable.lock().unwrap().push_back(t2.clone());
            });

            // 调用该任务的 poll 方法
            task.lock().unwrap().poll(Waker(wake));
        }

        // 如果没有可执行的任务,阻塞 epoll 直到某些任务变得就绪
        REACTOR.with(|reactor| reactor.wait()); // 👈
    }
}

漂亮!

……忽略混乱的 Arc<Mutex>。

好!调度器和反应器共同构成了一个 future 的运行时。调度器会跟踪哪些任务是可运行的,并轮询它们,当 epoll 告诉我们它们感兴趣的内容准备就绪时,反应器会将任务标记为可运行。

trait Future {
    type Output;
    fn poll(&mut self, waker: Waker) -> Option<Self::Output>;
}

static SCHEDULER: Scheduler = Scheduler { /* ... */ };

// 调度器
#[derive(Default)]
struct Scheduler {
    runnable: Mutex<VecDeque<SharedTask>>,
}

type SharedTask = Arc<Mutex<dyn Future<Output = ()> + Send>>;

impl Scheduler {
    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static);
    pub fn run(&self);
}

thread_local! {
    static REACTOR: Reactor = Reactor::new();
}

// 反应器
struct Reactor {
    epoll: RawFd,
    tasks: RefCell<HashMap<RawFd, Waker>>,
}

impl Reactor {
    pub fn new() -> Reactor;
    pub fn add(&self, fd: RawFd, waker: Waker);
    pub fn remove(&self, fd: RawFd);
    pub fn wait(&self);
}

我们已经写好了运行时,下面尝试使用它。

异步的 Web 服务

是时候来写供我们的调度器执行的任务了。跟之前一样,我们使用枚举作为状态机来管理程序的状态。不同的是,这次不再在一个巨大的循环中统一管理程序状态,而是每个任务独立管理自己的状态。

一开始,我们需要编写主任务,它将掌控整个程序的调度器可执行队列的开关。

fn main() {
    SCHEDULER.spawn(Main::Start);
    SCHEDULER.run();
}

enum Main {
    Start,
}

impl Future for Main {
    type Output = ();

    fn poll(&mut self, waker: Waker) -> Option<()> {
        // ...
    }
}

任务的开始跟之前一样,创建一个 TCP 侦听器并设置为非阻塞模式。

// impl Future for Main {
fn poll(&mut self, waker: Waker) -> Option<()> {
    if let Main::Start = self {
        let listener = TcpListener::bind("localhost:3000").unwrap();
        listener.set_nonblocking(true).unwrap();
    }

    None
}

然后需要在 epoll 中注册侦听器,使用我们的反应器来实现它。

// impl Future for Main {
fn poll(&mut self, waker: Waker) -> Option<()> {
    if let Main::Start = self {
        // ...

        REACTOR.with(|reactor| {
            reactor.add(listener.as_raw_fd(), waker);
        });
    }
}

请注意由调度器提供的唤醒器是如何给到反应器的,当新的连接进来,epoll 将返回一个事件,反应器将唤醒相关任务,引起调度器将任务重新放入队列并再次轮询它。唤醒器将所有环节都连接起来了。

现在,当任务下一次被执行时,我们需要第二种状态: Accept。主任务将一直保持在 Accept 状态,保证新连接进来能及时得到处理。

enum Main {
    Start,
    Accept { listener: TcpListener }, // 👈
}

fn poll(&mut self, waker: Waker) -> Option<()> {
    if let Main::Start = self {
        // ...
        *self = Main::Accept { listener };
    }

    if let Main::Accept { listener } = self {
        match listener.accept() {
            Ok((connection, _)) => {
                // ...
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                return None;
            }
            Err(e) => panic!("{e}"),
        }
    }

    None
}

如果侦听器未就绪,我们返回 None 即可。这告诉调度器 future 还没有就绪,等下次反应器唤醒它时再调度。

如果我们接受新的连接,同样需要将其设置为非阻塞模式。

fn poll(&mut self, waker: Waker) -> Option<()> {
    if let Main::Start = self {
        // ...
    }

    if let Main::Accept { listener } = self {
        match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap(); // 👈 
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
            Err(e) => panic!("{e}"),
        }
    }

    None
}

接下来,需要创建新的任务来执行传入的请求。

fn poll(&mut self, waker: Waker) -> Option<()> {
    if let Main::Start = self {
        // ...
    }

    if let Main::Accept { listener } = self {
        match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap();

                SCHEDULER.spawn(Handler { // 👈
                    connection,
                    state: HandlerState::Start,
                });
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
            Err(e) => panic!("{e}"),
        }
    }
}

请求处理程序跟之前的差不多,但现在它自己管理连接及当前状态,与之前的 ConnectionState 完全相同。

struct Handler {
    connection: TcpStream,
    state: HandlerState,
}

enum HandlerState {
    Start,
    Read {
        request: [u8; 1024],
        read: usize,
    },
    Write {
        response: &'static [u8],
        written: usize,
    },
    Flush,
}

任务处理程序首先在反应器注册当前连接,以便在连接准备好读/写时收到通知。同样,通过唤醒器让调度器知道何时再次调度它。

impl Future for Handler {
    type Output = ();

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        if let HandlerState::Start = self.state {
            // 注册当前连接以便得到通知
            REACTOR.with(|reactor| {
                reactor.add(self.connection.as_raw_fd(), waker);
            });

            self.state = HandlerState::Read {
                request: [0u8; 1024],
                read: 0,
            };
        }

        // ...
    }
}

读、写和刷新状态跟之前完全一样,只是当遇到 WouldBlock 时,我们仅返回 None 即可。因为我们知道后续被唤醒时就会再次得到执行的机会。

/ impl Future for Handler {
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
    if let HandlerState::Start = self.state {
        // ...
    }

    // 处理请求
    if let HandlerState::Read { request, read } = &mut self.state {
        loop {
            match self.connection.read(&mut request[*read..]) {
                Ok(0) => {
                    println!("client disconnected unexpectedly");
                    return Some(());
                }
                Ok(n) => *read += n,
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None, // 👈
                Err(e) => panic!("{e}"),
            }

            // 判断是否读到结束标记
            let read = *read;
            if read >= 4 && &request[read - 4..read] == b"\r\n\r\n" {
                break;
            }
        }

        // 操作完成,打印收到的请求数据
        let request = String::from_utf8_lossy(&request[..*read]);
        println!("{}", request);

        // 切换到写状态
        let response = /* ... */;

        self.state = HandlerState::Write {
            response: response.as_bytes(),
            written: 0,
        };
    }

    // 写入响应数据
    if let HandlerState::Write { response, written } = &mut self.state {
        // self.connection.write...

        // 写操作完成,进入刷新状态
        self.state = HandlerState::Flush;
    }

    // 刷新以保证操作完成
    if let HandlerState::Flush = self.state {
        match self.connection.flush() {
            Ok(_) => {}
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None, // 👈
            Err(e) => panic!("{e}"),
        }
    }
}

注意到了吗?当任务是独立、封装的对象时,事情会变得多美好!

任务生命周期结束时,它从反应器中移除自己的连接并返回 Some,此后它将不再被执行。

fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
    // ...

    REACTOR.with(|reactor| {
        reactor.remove(self.connection.as_raw_fd());
    });

    Some(())
}

完美!我们的 Web 服务端越来越好了。任务之间完全独立,我们可以像创建线程那样创建任务。

fn main() {
    SCHEDULER.spawn(Main::Start);
    SCHEDULER.run();
}

// 主任务: 保持侦听
enum Main {
    Start,
    Accept { listener: TcpListener },
}

impl Future for Main {
    type Output = ();

    fn poll(&mut self, waker: Waker) -> Option<()> {
        if let Main::Start = self {
            // ...
            REACTOR.with(|reactor| {
                reactor.add(listener.as_raw_fd(), waker);
            });

            *self = Main::Accept { listener };
        }

        if let Main::Accept { listener } = self {
            match listener.accept() {
                Ok((connection, _)) => {
                    // ...
                    SCHEDULER.spawn(Handler {
                        connection,
                        state: HandlerState::Start,
                    });
                }
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
                Err(e) => panic!("{e}"),
            }
        }

        None
    }
}

// 执行任务: 连接建立后处理具体事务
struct Handler {
    connection: TcpStream,
    state: HandlerState,
}

enum HandlerState {
    Start,
    Read {
        request: [u8; 1024],
        read: usize,
    },
    Write {
        response: &'static [u8],
        written: usize,
    },
    Flush,
}

impl Future for Handler {
    type Output = ();

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        if let HandlerState::Start = self.state {
            REACTOR.with(|reactor| {
                reactor.add(self.connection.as_raw_fd(), waker);
            });

            self.state = HandlerState::Read { /* .. */ };
        }

        if let HandlerState::Read { request, read } = &mut self.state {
            // ...
            self.state = HandlerState::Write { /* .. */ };
        }

        if let HandlerState::Write { response, written } = &mut self.state {
            // ...
            self.state = HandlerState::Flush;
        }

        if let HandlerState::Flush = self.state {
            // ...
        }

        REACTOR.with(|reactor| {
            reactor.remove(self.connection.as_raw_fd());
        });

        Some(())
    }
}

执行结果:

$ curl localhost:3000
# => Hello world!

工作正常!

全功能的 Web 服务

得益于新的 future 抽象,我们的服务端比之前更优秀。Future 独立管理自己的状态,调度器不需要关心 epoll, 专注于执行任务。任务可以被创建、唤醒,不需要关心调度器的底层细节。这真的是非常好的编程模式。

任务被封装是件好事,但我们仍然必须以类似状态机的方式编写所有内容,诚然,Rust 使这很容易用枚举来完成,但我们能做得更好吗?

看看我们写的两个 future,它们有很多共同点:每个 future 都有多个状态;在每种状态下都会运行一些代码;如果这些代码成功执行完毕,我们将转换到下一状态;如果遇到 WouldBlock 则返回 None,表示尚未准备就绪。

这似乎是可以抽象的东西。

我们需要从代码块中创建 future,以及将两个 future 组合在一起的方法。给定一个代码块,我们需要能够构建一个 future ……这听起来跟闭包很相似?

fn future_fn(f: F) -> impl Future
where
    F: Fn(),
{
    // ...
}

还需要能修改内部的状态

fn future_fn(f: F) -> impl Future
where
    F: FnMut(),
{
    // ...
}

还需要传入一个唤醒器

fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
    F: FnMut(Waker) -> Option<T>,
{
    // ...
}

还… 需要一个返回值,未就绪时它返回 None。实际上,我们只需要复制 poll 函数的签名,它已经满足了闭包的要求。

fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
    F: FnMut(Waker) -> Option<T>,
{
    // ...
}

实现 poll_fn 貌似不难,我们只需要包装一个实现了 Future 的结构并委托 poll 函数给闭包。

fn poll_fn<F, T>(f: F) -> impl Future<Output = T>
where
    F: FnMut(Waker) -> Option<T>,
{
    struct FromFn<F>(F);

    impl<F, T> Future for FromFn<F>
    where
        F: FnMut(Waker) -> Option<T>,
    {
        type Output = T;

        fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
            (self.0)(waker)
        }
    }

    FromFn(f)
}

好吧。让我们使用新的 poll_fn 帮助函数来尝试重写主任务,将 Main::Start 状态的代码粘贴到闭包中即可。

fn main() {
    SCHEDULER.spawn(listen());
    SCHEDULER.run();
}

fn listen() -> impl Future<Output = ()> {
    let start = poll_fn(|waker| {
        let listener = TcpListener::bind("localhost:3000").unwrap();

        listener.set_nonblocking(true).unwrap();

        REACTOR.with(|reactor| {
            reactor.add(listener.as_raw_fd(), waker);
        });

        Some(listener)
    });

    // ...
}

请记住,Main::Start 从不等待任何 I/O,因此它会立即准备好侦听器。

同样,我们使用 poll_fn 帮助函数来重写 Main::Accept future。

fn listen() -> impl Future<Output = ()> {
    let start = poll_fn(|waker| {
        // ...
        Some(listener)
    });

    let accept = poll_fn(|_| match listener.accept() {
        Ok((connection, _)) => {
            connection.set_nonblocking(true).unwrap();

            SCHEDULER.spawn(Handler {
                connection,
                state: HandlerState::Start,
            });

            None
        }
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
        Err(e) => panic!("{e}"),
    });
}

另外,accept 总是返回 None,因为我们希望每次有新连接传入时都能调用它。它贯穿于我们的整个程序。

现在我们有了两个任务状态,需要以某种方式将它们连接起来。

fn chain<F1, F2>(future1: F1, future2: F2) -> impl Future<Output = F2::Output>
where
    F1: Future,
    F2: Future
{
    // ...
}

嗯~这其实不能工作。

第二个 future 需要访问第一个的输出:TCP 侦听器。

直接使用行不通,考虑在第一个 future 的输出上接一个闭包,它可以使用第一个 future 的输出来构建第二个。

fn chain<T1, F, T2>(future1: T1, chain: F) -> impl Future<Output = T2::Output>
where
    T1: Future,
    F: FnOnce(T1::Output) -> T2,
    T2: Future
{
    // ...
}

看上去好些了。

我们不妨再花哨一点,让 chain 成为 Future trait 的方法。这样,我们就可以在任何 future 调用 .chain 作为后缀方法了。

trait Future {
    // ...

    fn chain<F, T>(self, chain: F) -> Chain<Self, F, T>
    where
        F: FnOnce(Self::Output) -> T,
        T: Future,
        Self: Sized,
    {
        // ...
    }
}

enum Chain<T1, F, T2> {
    // ...
}

看起来不错,我们试试看。

Chain future 是状态机的泛化,所以它本身就是一个迷你状态机。它首先轮询第一个 future,然后在过渡闭包完成后保留住它。

enum Chain<T1, F, T2> {
    First { future1: T1, transition: F },
}

impl<T1, F, T2> Future for Chain<T1, F, T2>
where
    T1: Future,
    F: FnOnce(T1::Output) -> T2,
    T2: Future,
{
    type Output = T2::Output;

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        if let Chain::First { future1, transition } = self {
            // 轮询第一个 future
            match future1.poll(waker) {
                // ...
            }
        }
    }
}

一旦第一个 future 执行完毕,它使用过渡闭包构造第二个 future,并轮询它:

enum Chain<T1, F, T2> {
    First { future1: T1, transition: F },
    Second { future2: T2 },
}

impl<T1, F, T2> Future for Chain<T1, F, T2>
where
    T1: Future,
    F: FnOnce(T1::Output) -> T2,
    T2: Future,
{
    type Output = T2::Output;

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        if let Chain::First { future1, transition } = self {
            // 轮询第一个 future
            match future1.poll(waker.clone()) {
                Some(value) => {
                    // 第一个 future 已完成,切换到下一个
                    let future2 = (transition)(value); // 👈
                    *self = Chain::Second { future2 };
                }
                // 第一个 future 未完成,直接返回
                None => return None,
            }
        }

        if let Chain::Second { future2 } = self {
            // 第一个 future 已完成,轮询下一个
            return future2.poll(waker); // 👈
        }

        None
    }
}

请注意,同一个唤醒器是如何用于轮询两个 future 的。取决于两个 future 的状态,通知将仅传播到 Chain 的父 future。

嗯……这实际上是不能工作的:

error[E0507]: cannot move out of `*transition` which is behind a mutable reference
   --> src/main.rs:182:33
    |
182 |    let future2 = (transition)(value);
    |                  ^^^^^^^^^^^^ move occurs because `*transition` has type `F`,
                                    which does not implement the `Copy` trait

哦,是的,transition 是一个 FnOnce 的闭包,它在第一次被调用时就被消耗掉了。虽然基于我们的状态机,我们只调用它一次,但编译器并不知道这一点。我们可以将其封装在 Option 中,并使用 take 来获取它的同时将 Option 替换为 None,从而获取它的所有权。这是使用状态机时常见的模式。

enum Chain<T1, F, T2> {
    First { future1: T1, transition: Option<F> }, // 👈
    Second { future2: T2 },
}

// ...
fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
    if let Chain::First { future1, transition } = self {
        match future1.poll(waker.clone()) {
            Some(value) => {
                let future2 = (transition.take().unwrap())(value); // 👈
                *self = Chain::future2 { future2 };
            }
            None => return None,
        }
    }

    // ...
}

完美!现在,在其初始状态下,chain 方法就可以很轻松地构建我们的 Chain Future 了。

trait Future {
    // ...
    fn chain<F, T>(self, transition: F) -> Chain<Self, F, T>
    where
        F: FnOnce(Self::Output) -> T,
        T: Future,
        Self: Sized,
    {
        Chain::First {
            future1: self,
            transition: Some(transition),
        }
    }
}

好吧,我们说到哪儿了……对了,是主 future!

fn listen() -> impl Future<Output = ()> {
    let start = poll_fn(|waker| {
        // ...
        Some(listener)
    });

    let accept = poll_fn(|_| match listener.accept() {
        // ...
    });
}

我们可以用新的 chain 方法组合这两个 future。

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        // ...
        Some(listener)
    })
    .chain(|listener| { // 👈
        poll_fn(move |_| match listener.accept() {
            // ...
        })
    })
}

嗯,看起来很不错!我们的手动状态机已经不复存在了,侦听方法现在用简单的闭包就可以表示!

fn main() {
    SCHEDULER.spawn(listen());
    SCHEDULER.run();
}

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        let listener = TcpListener::bind("localhost:3000").unwrap();
        // ...

        REACTOR.with(|reactor| {
            reactor.add(listener.as_raw_fd(), waker);
        });

        Some(listener)
    })
    .chain(|listener| {
        poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // ...
                SCHEDULER.spawn(Handler {
                    connection,
                    state: HandlerState::Start,
                });

                None
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
            Err(e) => panic!("{e}"),
        })
    })
}

真是太好了!

接下来,用同样的方式,我们可以将连接处理的部分也转换成基于闭包的 future。首先我们将它分离出来,形成一个返回闭包的函数。

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        // ...
    })
    .chain(|listener| {
        poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // ...
                SCHEDULER.spawn(handle(connection)); // 👈
                None
            }
            // ...
        })
    })
}

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    // ...
}

最开始的状态 HandlerState::Start 只是一个简单的 poll_fn 闭包,它向反应器注册连接并立即返回。

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    poll_fn(move |waker| {
        REACTOR.with(|reactor| {
            reactor.add(connection.as_raw_fd(), waker);
        });

        Some(())
    })
}

第二个状态 HandlerState::Read 可以很容易地通过 chain 来组合 future,它在栈上初始化一个本地连接状态并将它移动到 future 中,让 future 管理自身的状态。

fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
    poll_fn(move |waker| {
        // ...
    })
    .chain(move |_| {
        let mut read = 0;
        let mut request = [0u8; 1024]; // 👈

        poll_fn(move |_| {
            loop {
                // 尝试从流中读取数据
                match connection.read(&mut request[read..]) {
                    Ok(0) => {
                        println!("client disconnected unexpectedly");
                        return Some(());
                    }
                    Ok(n) => read += n,
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
                    Err(e) => panic!("{e}"),
                }

                // 判断是否读到结束标记
                let read = read;
                if read >= 4 && &request[read - 4..read] == b"\r\n\r\n" {
                    break;
                }
            }

            // 操作完成,打印收到的请求数据
            let request = String::from_utf8_lossy(&request[..read]);
            println!("{request}");

            Some(())
        })
    })
}

HandlerState::Write 和 HandlerState::Flush 可以用同样的方式组合各自的任务。

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    poll_fn(move |waker| {
        // REACTOR.register...
    })
    .chain(move |_| {
        // connection.read...
    })
    .chain(move |_| {
        let response = /* ... */;
        let mut written = 0;

        poll_fn(move |_| {
            loop {
                match connection.write(response[written..].as_bytes()) {
                    Ok(0) => {
                        println!("client disconnected unexpectedly");
                        return Some(());
                    }
                    Ok(n) => written += n,
                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => return None,
                    Err(e) => panic!("{e}"),
                }

                // 判断响应数据是否已全部写入完毕
                if written == response.len() {
                    break;
                }
            }

            Some(())
        })
    })
    .chain(move |_| {
        poll_fn(move |_| {
            match connection.flush() {
                Ok(_) => {}
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    return None;
                }
                Err(e) => panic!("{e}"),
            };

            REACTOR.with(|reactor| reactor.remove(connection.as_raw_fd());
            Some(())
        })
    })
}

漂亮!

啊?!……

error[E0382]: use of moved value: `connection`
  --> src/main.rs:59:12
   |
51 | fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
   |           -------------- move occurs because `connection` has type `TcpStream`, which does not implement the `Copy` trait
52 |     poll_fn(move |waker| {
   |             ------------- value moved into closure here
53 |         REACTOR.with(|reactor| {
54 |             reactor.add(connection.as_raw_fd(), waker);
   |                          ---------- variable moved due to use in closure
...
59 |     .chain(move |_| {
   |            ^^^^^^^^ value used here after move
...
66 |                 match connection.read(&mut request[read..]) {
   |                       ---------- use occurs due to use in closure

error[E0382]: use of moved value: `connection`
// ...

唔……

所有的 future 都使用了 move 来传递参数给闭包,意味着闭包获取了 connection 的所有权,每个 connection 的所有权是唯一的,不应该移动进闭包,那么去掉 move 试试看?

error[E0373]: closure may outlive the current function, but it borrows `connection`, which is owned by the current function
   --> src/main.rs:52:13
    |
52  |     poll_fn(|waker| {
    |             ^^^^^^^^ may outlive borrowed value `connection`
53  |         REACTOR.with(|reactor| {
54  |             reactor.add(connection.as_raw_fd(), waker);
    |                          ---------- `connection` is borrowed here
    |
note: closure is returned here
   --> src/main.rs:52:5
    |
52  | /     poll_fn(|waker| {
53  | |         REACTOR.with(|reactor| {
54  | |             reactor.add(connection.as_raw_fd(), waker);
55  | |         });
...   |
128 | |         })
129 | |     })
    | |______^
help: to force the closure to take ownership of `connection` (and any other referenced variables), use the `move` keyword
    |
52  |     poll_fn(move |waker| {
    |             ++++

看上去也不行。连接的生命周期需要足够长。如果我们将它移动到第一个 future ,让其他 future 借用它呢?

error[E0382]: use of moved value: `connection`
  --> src/main.rs:59:12
   |
51 | fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
   |           -------------- move occurs because `connection` has type `TcpStream`, which does not implement the `Copy` trait
52 |     poll_fn(move |waker| {
   |             ------------- value moved into closure here
53 |         REACTOR.with(|reactor| {
54 |             reactor.add(connection.as_raw_fd(), waker);
   |                          ---------- variable moved due to use in closure
...
59 |     .chain(|_| {
   |            ^^^ value used here after move
...
66 |                 match connection.read(&mut request[read..]) {
   |                       ---------- use occurs due to use in closure

果然,还是不行。

本质上,我们的链式 future 是这样的:第一个 future 拥有链接,其他 future 从它借用。

enum Handle {
    Start {
        connection: TcpStream,
    }
    Read {
        connection: &'??? TcpStream
    }
}

Which of course, doesn’t make much sense. ( 继续这种方式没有太大的意义?这里不知道怎么翻译符合语境,也不影响理解。 ) 一旦状态切换到 Read, Start 中的连接就被释放了,我们的引用就没了目标。

所以手动写 future 时该怎么做才可以呢?

struct Handler {
    connection: TcpStream,
    state: HandlerState,
}

enum HandlerState { /* ... */ }

好了,现在连接存在于外部结构中,也许我们可以编写另一个帮助程序,允许 future 引用存储在外部的一些数据?

就像这样:

struct WithData<D, F> {
    data: D,
    future: F,
}

看上去很简单, 构造一个 futrue,它可以捕获数据的引用,并且通过闭包来构造它们的关联,就像 chain 的实现一样。

impl<D, F> WithData<D, F> {
    pub fn new(data: D, construct: impl Fn(&D) -> F) -> WithData<D, F> {
        let future = construct(&data);
        WithData { data, future }
    }
}

WithData 可以通过委托内部 future 来实现自身的 Future 特性。

impl<D, F> Future for WithData<D, F>
where
    F: Future,
{
    type Output = F::Output;

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        self.future.poll(waker)
    }
}

现在我们可以用 WithData 封装我们的 future, 让 connection 在 future 返回之后仍然存活,这样应该就能正常了。

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    WithData::new(connection, |connection| {
        from_fn(move |waker| {
            // ...
        })
        .chain(move |_| {
            let mut request = [0u8; 1024];
            let mut read = 0;

            from_fn(move |_| {
                // ...
            })
        })
        .chain(move |_| {
            let response = /* ... */;
            let mut written = 0;

            from_fn(move |_| {
                // ...
            })
        })
        .chain(move |_| {
            from_fn(move |_| {
                // ...
            })
        })
    })
}

虽然看起来有的奇怪,但它能工作的话……

error: lifetime may not live long enough
   --> src/main.rs:53:9
    |
52  |       WithData::new(connection, |connection| {
    |                                  ----------- return type of closure `Chain<Chain<Chain<impl Future<Output = ()>, [closure@src/bin/play.rs:60:16: 86:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:87:16: 113:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:114:16: 130:10], impl Future<Output = ()>>` contains a lifetime `'2`
    |                                  |
    |                                  has type `&'1 TcpStream`
53  | /         from_fn(move |waker| {
54  | |             REACTOR.with(|reactor| {
55  | |                 reactor.add(connection.as_raw_fd(), waker);
56  | |             });
...   |
129 | |             })
130 | |         })
    | |__________^ returning this value requires that `'1` must outlive `'2`

看起来不是轻易能搞定的。

这又是一条非常诡异的错误信息:

return type of closure `Chain<Chain<Chain<impl Future<Output = ()>, [closure@src/bin/play.rs:60:16: 86:10], impl Future<Output = ()>>, [closure@src/bin/play.rs:87:16: 113: 10], impl Future<Output = ()>>, [closure@src/bin/play.rs:114:16: 130:10], impl Future<Output = ()>>` contains a lifetime `'2`

好吧,传递给 WithData 的巨型链式 future 包含了对 connection 的引用。

那正是我们想要的,为了其他 future 能借用 connection,对吧?

`connection` has type `&'1 TcpStream` ... returning this value requires that `'1` must outlive `'2`

嗯~ 在 WithData 结构中,我们没有实际指定 future 是从 data 借用数据,导致 Rust 不能推断出它的生命周期。那么,是不是给 WithData 标注上正确的生命周期就可以了呢?

struct WithData<'data, D, F> {
    data: D,
    future: F,
}
error[E0392]: parameter `'data` is never used
   --> src/bin/play.rs:160:17
    |
160 | struct WithData<'data, D, F> {
    |                 ^^^^^ unused parameter
    |
    = help: consider removing `'data`, referring to it in a field, or using a marker such as `PhantomData`

看起来加上 PhantomData 看起来就能解决。

struct WithData<'data, D, F> {
    data: D,
    future: F,
    _data: PhantomData<&'data D>,
}

future 确实引用了 &’data D,所以看上去很合理。接下来,需要在构造函数中说明 future 对 data 的借用。

impl<'data, D, F> WithData<'data, D, F>
where
    F: Future + 'data, // 👈
{
    pub fn new(
        data: D,
        construct: impl Fn(&'data D) -> F, // 👈
    ) -> WithData<'data, D, F> {
        let future = construct(&data);

        WithData {
            data,
            future,
            _data: PhantomData,
        }
    }
}

然后就应该能正常工作了吧?所有的生命周期都标记清楚了。

error[E0597]: `data` does not live long enough
   --> src/bin/play.rs:172:30
    |
167 | impl<'data, D, F> WithData<'data, D, F>
    |      ----- lifetime `'data` defined here
...
172 |         let future = construct(&data);
    |                      ----------^^^^^-
    |                      |         |
    |                      |         borrowed value does not live long enough
    |                      argument requires that `data` is borrowed for `'data`
...
178 |     }
    |     - `data` dropped here while still borrowed

error[E0505]: cannot move out of `data` because it is borrowed
   --> src/bin/play.rs:174:13
    |
167 | impl<'data, D, F> WithData<'data, D, F>
    |      ----- lifetime `'data` defined here
...
172 |         let future = construct(&data);
    |                      ----------------
    |                      |         |
    |                      |         borrow of `data` occurs here
    |                      argument requires that `data` is borrowed for `'data`
173 |         WithData {
174 |             data,
    |             ^^^^ move out of `data` occurs here

然而……并没有。

为什么呢?

`data` dropped here while still borrowed

等等,我们从闭包中去掉 move 的时候,也得到过同样的错误信息。但是现在 data 已经有足够长的生存周期了,为什么还会这样?

嗯……实际上,第二个错误告诉我们,移动 data 也是错误的。

cannot move out of `data` because it is borrowed

这道理其实说得通,构造 future 时借用的 data 实际存活在栈上,当它被移动后,就不在栈上原来的位置了。它的地址已经变化,导致后续 future 对 data 的引用实际是无效的。

## 移动前

       ┌─────────────┐
       │0101001010010│
data:  │001...       │ ◄──────── &future.data
       
       
       └─────────────┘


## 移动后

             ???       ◄──────── &future.data


       ┌─────────────┐
       │0101001010010│
data:  │001...       │
       
       
       └─────────────┘

我们给了 data 空间和足够长的生存周期,但是没有给它一个固定不变的空间。事实上,这在 Rust 中是一个众所周知的问题:我们试图创建被称为自引用结构的结构,而这是不可能安全地完成的。

早先,future 的状态整个包含在 Handler 结构中时,并不需要自引用结构。一切都在 Handler 中运行。 但当我们将任务分割为子任务时,我们需要一种能让它们独立访问数据的方法。

那么就不可能做到了吗?

其实……

我们可以用 Rc 来持有 data 并在每个 future 中通过克隆获取它的引用。这种方法将 data 存储在堆上,让 future 可以始终获取到正确的引用,只是需要在所有 future 都完成之后释放 data。

我们的代码变得越来越丑陋了。

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    let connection = Rc::new(connection); // 👈
    let read_connection_ref = connection.clone();
    let write_connection_ref = connection.clone();
    let flush_connection_ref = connection.clone();

    poll_fn(move |waker| {
        // ...
    })
    .chain(move |_| {
        // ...
        poll_fn(move |_| {
            let connection = &*read_connection_ref;

            loop {
                match (&mut connection).read(&mut request) {
                    // ...
                }
            }
            // ...
        })
    })
    .chain(move |_| {
        // ...
        poll_fn(move |_| {
            let connection = &*write_connection_ref;
            // ...
        })
    })
    .chain(move |_| {
        poll_fn(move |_| {
            let connection = &*flush_connection_ref;
            // ...
        })
    })
}

哦,不!

error[E0277]: `Rc<TcpStream>` cannot be sent between threads safely
   --> src/main.rs:90:37
    |
90  |     SCHEDULER.spawn(handle(connection));
    |               ----- ^^^^^^^^^^^^^^^^^^ `Rc<TcpStream>` cannot be sent between threads safely
    |               |
    |               required by a bound introduced by this call
...
100 |     fn handle(mut connection: TcpStream) -> impl Future<Output = ()> {
    |                                             ------------------------ within this `impl Future<Output = ()>`

使用 Rc 让这部分代码变成 !Send 的了,因为 Rc 没有实现 Send,虽然我们只在 future 内部使用 connection,并且 future 只由主线程来执行,但编译器不知道。为了通过编译,我们只能使用 Arc。

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    let connection = Arc::new(connection); // 👈
    // ...
}

有点难看,但至少能编译了。

我们的服务端程序不再有手工编写的状态机,看起来足够清爽。

比起手动 epoll 开始的方式,即便使用了奇怪的 Arc,看起来也要比之前干净得多。

fn main() {
    SCHEDULER.spawn(listen());
    SCHEDULER.run();
}

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        let listener = TcpListener::bind("localhost:3000").unwrap();

        // ...

        REACTOR.with(|reactor| {
            reactor.add(listener.as_raw_fd(), waker);
        });

        Some(listener)
    })
    .chain(|listener| {
        poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // ...
                SCHEDULER.spawn(handle(connection));
                None
            }
            // ...
        })
    })
}

fn handle(connection: TcpStream) -> impl Future<Output = ()> {
    let connection = Arc::new(connection);
    let read_connection_ref = connection.clone();
    let write_connection_ref = connection.clone();
    let flush_connection_ref = connection.clone();

    poll_fn(move |waker| {
        REACTOR.with(|reactor| {
            reactor.add(connection.as_raw_fd(), waker);
        });

        Some(())
    })
    .chain(move |_| {
        let mut request = [0u8; 1024];
        let mut read = 0;

        poll_fn(move |_| {
            // ...
        })
    })
    .chain(move |_| {
        let response = /* ... */;
        let mut written = 0;

        poll_fn(move |_| {
            // ...
        })
    })
    .chain(move |_| {
        poll_fn(move |_| {
            // ...
            REACTOR.with(|reactor| {
                reactor.remove(flush_connection_ref.as_raw_fd());
            });

            Some(())
        })
    })
}

当然,它也是能正常工作的。

$ curl localhost:3000
# => Hello world!

优雅的 Web 服务

哇,不知不觉已经这么多了。

还有最后一件事情:为了测试我们的任务模型,像之前讨论的那样,我们需要实现优雅的关闭机制。

设想一下我们的优雅关闭机制:当按下 ctrl+c 时,不是粗暴地立刻关闭程序,而是立刻停止接受新的连接请求,同时等待已建立连接的请求执行完毕,超过 30 秒没执行完毕的请求将会被强行终止,然后服务端程序才退出。

实现它我们有很多工作要做。首先要检测退出信号,在 Linux 中, ctrl+c 会触发 SIGINT 信号,我们使用 signal_hook crate 来接收这个信号。

use signal_hook::consts::signal::SIGINT;
use signal_hook::iterator::Signals;

fn ctrl_c() {
    let mut signal = Signals::new(&[SIGINT]).unwrap();
    let _ctrl_c = signal.forever().next().unwrap();
}

有个问题是 forever().next() 会阻塞线程直到收到信号,我们的服务端是异步的,如果在主线程调用 ctrl_c(),我们的整个程序将被阻塞。

我们需要以 future 的形式重新实现 ctrl_c 信号响应,当实际收到信号时才执行它。类似这样:

fn ctrl_c() -> impl Future<Output = ()> {
    poll_fn(move |waker| {
        // ...
    })
}

该怎样实现异步侦听信号呢?

我们可以向 epoll 注册一个信号处理程序,并借此机会学习如何在异步程序中处理阻塞任务。有时候,唯一能得到你想要的内容的方式就是通过阻塞 API,但又不能简单地在主线程上使用它。所以,可以在一个单独的线程上运行阻塞任务,并在它完成时通知主线程。

fn spawn_blocking(blocking_work: impl FnOnce()) -> impl Future<Output = ()> {
    // 在单独的线程中执行阻塞的工作
    std::thread::spawn(move || {
        blocking_work();
    });

    poll_fn(|waker| {
        // ???
    }))
}

问题是,阻塞任务完成时我们怎么能知道?

阻塞任务在单独的线程中执行,在 future 之外。它需要访问 waker, 这样它才能在工作完成之后通知 future。我们只能在 future 首次被轮询时才能访问到 waker,所以状态必须以 None 开始。

如果工作在 future 被轮询之前就已完成,那么 future 被轮询时就无法判断工作是未开始还是已完成了。所以还需要一个标记来告诉 future 工作已完成。

这两个状态可以保存在一个互斥器包裹的变量中。

let state: Arc<Mutex<(bool, Option<Waker>)>> = Arc::default();

一旦线程完成了工作,它必须设置标记为 true,如果 waker 在之前已保存的话还要调用 wake 方法。如果之前 waker 没保存也没关系,首次被轮询时 future 会检查标记并立即返回。

fn spawn_blocking(blocking_work: impl FnOnce() + Send + 'static) -> impl Future<Output = ()> {
    let state: Arc<Mutex<(bool, Option<Waker>)>> = Arc::default();
    let state_handle = state.clone();

    // 在单独的线程中执行阻塞的工作
    std::thread::spawn(move || {
        // 执行任务
        blocking_work();

        // 标记工作已完成
        let (done, waker) = &mut *state_handle.lock().unwrap();
        *done = true;

        // 获取唤醒器并执行唤醒操作
        if let Some(waker) = waker.take() {
            waker.wake();
        }
    });

    poll_fn(|waker| {
        // ...
    }))
}

现在 future 需要访问 state 并检测它是否是已完成状态。如果不是,它保存 waker 并返回 None,然后当任务完成时再次被唤醒。

fn spawn_blocking(blocking_work: impl FnOnce() + Send + 'static) -> impl Future<Output = ()> {
    let state: Arc<Mutex<(bool, Option<Waker>)>> = Arc::default();
    let state_handle = state.clone();

    // 在单独的线程中执行阻塞的工作
    std::thread::spawn(move || {
        // ...
    });

    poll_fn(move |waker| match &mut *state.lock().unwrap() {
        // 工作未完成,保存 waker 等待再次被唤醒
        (false, state) => {
            *state = Some(waker);
            None
        }
        // 工作已完成
        (true, _) => Some(()),
    })

}

future 将 spawn_blocking 服务作为异步版本的 JoinHandle 返回,当阻塞任务在单独的线程中运行时,主线程可以异步地等待它完成。

fn ctrl_c() -> impl Future<Output = ()> {
    spawn_blocking(|| {
        let mut signal = Signals::new(&[SIGINT]).unwrap();
        let _ctrl_c = signal.forever().next().unwrap();
    })
}

spawn_blocking 是一个非常方便的抽象,常用于处理异步程序中的阻塞 API。

好了,我们终于有了能等待 ctrl+c 信号的 future。

还记得之前使用阻塞 I/O 时的情景吗?当时我们想知道该如何监视信号,才能在信号到达时立即终止侦听。当时我们就意识到需要以某种方式来同时监听新连接进入和 ctrl+c 信号。

因为 accept 是阻塞的,这并不容易,但是有了 future 就让这变得可能了!

我们将这包装成另一个 future,针对这两个 future,可以创建一个包装器实现选择功能:哪个 future 先完成,就将它的输出作为包装器的输出。

fn select<L, R>(left: L, right: R) -> Select<L, R> {
    Select { left, right }
}

struct Select<L, R> {
    left: L,
    right: R
}

enum Either<L, R> {
    Left(L),
    Right(R)
}

impl<L, R> Future for Select<L, R> {
    type Output = Either<L, R>;

    fn poll(&mut self, waker: Waker) -> Self::Output {
        // ...
    }
}

这个 select future 的实现很简单,只需要两个 future 都轮询,返回最先完成的即可。

impl<L, R> Future for Select<L, R> {
    type Output = Either<L::Output, R::Output>;

    fn poll(&mut self, waker: Waker) -> Option<Self::Output> {
        if let Some(output) = self.left.poll(waker.clone()) {
            return Some(Either::Left(output));
        }

        if let Some(output) = self.right.poll(waker) {
            return Some(Either::Right(output));
        }

        None
    }
}

因为我们将同一个唤醒器传递给两个 future ,任何一个 future 有进展都会通知我们,我们可以检查任意一个是否已完成。

真的很简单。

现在回到主程序。如果有一个活动任务计数器,在所有任务都执行完时我们就能知道。

该计数器可以保存在 listen future 中,当任务被创建时增加计数,完成时减少。

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        let listener = TcpListener::bind("localhost:3000").unwrap();
        // ...
    })
    .chain(|listener| {
        poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // ...
                SCHEDULER.spawn(handle(connection));
                None
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
            Err(e) => panic!("{e}"),
        })
    })
}

使用上述的 select 组合子将 TCP 侦听器和 ctrl+c 侦听器组合在一起,我们可以同时监听它们。

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        // ...
    })
    .chain(|listener| {
        let listen = poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // ...
                SCHEDULER.spawn(handle(connection));
                None
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
            Err(e) => panic!("{e}"),
        });

        select(listen, ctrl_c())
    })
}

TCP 侦听器永远不会完成 —— 它代表着同步服务器中的循环, 虽然 ctrl_c() 可以完成,但也还需要链式调用另一个任务来处理关闭信号。

fn listen() -> impl Future<Output = ()> {
    poll_fn(|waker| {
        // ...
    })
    .chain(|listener| {
        let listen = /* ... */;
        select(listen, ctrl_c())
    })
    .chain(|_ctrl_c| graceful_shutdown())
}

fn graceful_shutdown() -> impl Future<Output = ()> {
    // ...
}

现在我们需要实现整套关闭逻辑。一旦收到关闭的信号,我们等待当前处理中的请求执行完毕,30 秒后还没执行完的强制关闭。

这听起来跟我们使用 select 的场景很相似,要么等待 30 秒,要么所有处理中的请求执行完毕。

fn graceful_shutdown() -> impl Future<Output = ()> {
    let timer = /* ... */;
    let request_counter = /* ... */;

    select(timer, request_counter).chain(|_| {
        poll_fn(|waker| {
            // 所有任务都已执行完毕,现在可以正常退出程序
            println!("Graceful shutdown complete");
            std::process::exit(0)
        })
    })
}

我们要做的是为关闭生成两个 future。

第一个我们需要一个计时器,不能简单地调用 thread::sleep,因为它是阻塞的,但可以通过 spawn_blocking 来执行它,用返回的句柄来代表我们的 future。

请注意,有其他方法可以实现支持 epoll 构建异步定时器,但这超出了本文的范围。

use std::thread;
use std::time::Duration;

fn graceful_shutdown() -> impl Future<Output = ()> {
    let timer = spawn_blocking(|| thread::sleep(Duration::from_secs(30)));
    let request_counter = /* ... */;

    select(timer, request_counter).chain(|_| {
        poll_fn(|waker| {
            // 所有任务都已执行完毕,现在可以正常退出程序
            println!("Graceful shutdown complete");
            std::process::exit(0)
        })
    })
}

这足够简洁了。

接下来就是主要的关闭部分了,为了掌握请求全部执行完毕的准确时间,我们需要统计执行中的请求数量。

我们可以在 listen future 中保存本地计数器,当任务被 创建/完成 时 增/减 计数器的值。

fn listen() -> impl Future<Output = ()> {
    let tasks = Arc::new(Mutex::new(0));

    poll_fn(|waker| {
        // ...
    })
    .chain(move |listener| {
        let listen = poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                // 计数器值增加
                *tasks.lock().unwrap() += 1; // 👈

                let handle_connection = handle(connection).chain(|_| {
                    poll_fn(|_| {
                        // 计数器值减少
                        *tasks.lock().unwrap() -= 1; // 👈
                        Some(())
                    })
                });

                SCHEDULER.spawn(handle_connection);
                None
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None,
            Err(e) => panic!("{e}"),
        });

        select(listen, ctrl_c())
    })
    .chain(|_ctrl_c| graceful_shutdown())
}

注意,计数器值的递减是链接到任务处理程序中的,所以在任务被创建并完成之后,才递减计数器的值。

任务计数器很棒,但我们还有一点工作要做。在循环中只检查 tasks == 0 显然是不够的,在最后的任务完成时,关闭任务需要得到它可以执行的通知。

为此,任务处理需要访问关闭处理任务的唤醒器。

跟之前使用 spawn_blocking 的解决方案一样,只是现在需要的是一个计数器而不是布尔类型的标记。我们可以用一个结构来包装所有状态。

#[derive(Default)]
struct Counter {
    state: Mutex<(usize, Option<Waker>)>,
}

impl Counter {
    fn increment(&self) {
        let (count, _) = &mut *self.state.lock().unwrap();
        *count += 1;
    }

    fn decrement(&self) {
        let (count, waker) = &mut *self.state.lock().unwrap();
        *count -= 1;

        // 已经是最后一个任务了
        if *count == 0 {
            // 唤醒关闭的 future
            if let Some(waker) = waker.take() {
                waker.wake();
            }
        }
    }


    fn wait_for_zero(self: Arc<Self>) -> impl Future<Output = ()> {
        poll_fn(move |waker| {
            match &mut *self.state.lock().unwrap() {
                // 任务已全部完成
                (0, _) => Some(()),
                // 任务未全部完成,保存唤醒器
                (_, state) => {
                    *state = Some(waker);
                    None
                }
            }
        })
    }
}

当 wait_for_zero 第一次被调用、返回之前,它将唤醒器存储在计数器状态中,然后减少计数,再看它是否是最后一个活动任务,是的话调用 wake 方法,通知 wait_for_zero 的调用者。

当关闭处理程序被唤醒时,它将看到计数器为零并关闭程序。

现在,我们用 Counter 对象替换我们的手工计数器。

fn listen() -> impl Future<Output = ()> {
    let tasks = Arc::new(Counter::default()); // 👈
    let tasks_ref = tasks.clone();

    poll_fn(|waker| {
        // ...
    })
    .chain(move |listener| {
        let listen = poll_fn(move |_| match listener.accept() {
            Ok((connection, _)) => {
                connection.set_nonblocking(true).unwrap();

                // 计数器值增加
                tasks.increment(); // 👈

                let tasks = tasks.clone();
                let handle_connection = handle(connection).chain(|_| {
                    poll_fn(move |_| {
                        // 计数器值减少
                        tasks.decrement(); // 👈
                        Some(())
                    })
                });

                SCHEDULER.spawn(handle_connection);
                None
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => None::<()>,
            Err(e) => panic!("{e}"),
        });

        select(listen, ctrl_c())
    })
    .chain(|_ctrl_c| graceful_shutdown(tasks_ref)) // 👈
}

我们的关停程序就能使用 wait_for_zero 来等待所有的活动任务完成了。一旦完成,或者是超过了等待时间,实际的关停动作才被触发,程序退出。

fn graceful_shutdown(tasks: Arc<Counter>) -> impl Future<Output = ()> {
    poll_fn(|waker| {
        let timer = spawn_blocking(|| thread::sleep(Duration::from_secs(30)));
        let request_counter = tasks.wait_for_zero(); // 👈
        select(timer, request_counter)
    }).chain(|_| {
        // 所有任务都已执行完毕,现在可以正常退出程序
        println!("Graceful shutdown complete");
        std::process::exit(0)
    })
}

终于完成了!

现在如果运行我们的 Web 服务端并按下 ctrl+c,它将立即退出,不被任何其他连接所阻塞。

$ cargo run
^C
# => Graceful shutdown complete
$ |

回顾

嗯,这可真是一次不短的探索旅程。

我们的服务器看起来已经相当不错了,从线程到 epoll 事件循环,再到 futures 和闭包组合子,我们走了很长的路。虽然在 future 上还可以进一步抽象,但总体而言,我们的程序已经相对清晰了。

与最初的多线程程序相比,我们的代码更复杂,然而它也更强大。组合 futures 很简单,我们能够表达复杂的、难以用线程实现的控制流,甚至我们仍然可以调用阻塞函数,而不中断异步运行时。

这么多的功能,肯定是有代价的,对吧?

回到现实

现在我们已经深入研究了并发和异步,让我们看看它在现实世界中是如何工作的。

标准库定义了一个叫 Future 的特性,它看起来与我们设计的非常相似。

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

然而也有一些明显的区别。

首先,poll 传入的是一个 &mut Context 的参数,而不是唤醒器。不过事实上这区别不大,因为目前 Context 只是 Waker 的一个简单的包装器。

impl Context<'_> {
    pub fn from_waker(waker: &'a Waker) -> Context<'a>  { /* ... */ }
    pub fn waker(&self) -> &'a Waker  { /* ... */ }
}

而 Waker 和其他一些实用方法中,有我们熟悉的 wake 方法。

impl Waker {
    pub fn wake(self) { /* ... */ }
}

构造唤醒器有一点小复杂,但它本质上是个手工构建的特性对象 ( trait ),类似我们之前用过的 Arc<dyn Fn()>。它通过原始唤醒器类型 ( RawWaker ) 来构造,您可以点击这里查看详情。

第二,poll 返回的不是 Option, 而是一个称为 Poll 的枚举,它实际上也只是 Option 的包装:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

最后一点不同有点复杂。

Pinning

poll 获取的不是自身的可变引用,而是一个能被钉住 (pinned) 的、指向自身的可变引用 —— Pin<&mut Self>。

也许你要问,什么是 Pin?

#[derive(Copy, Clone)]
pub struct Pin<P> {
    pointer: P,
}

嗯,看起来没啥用。

实际上,Pin 的特殊之处与它如何被创建有关:

impl<P: Deref> Pin<P> {
    pub fn new(pointer: P) -> Pin<P> where P::Target: Unpin { /* ... */ }
    pub unsafe fn new_unchecked(pointer: P) -> Pin<P>  { /* ... */ }
}

impl<P: Deref> Deref for Pin<P> {
    type Target = P::Target;
}

impl<P: Deref> DerefMut for Pin<P>
where
    P::Target: Unpin
{
    type Target = P::Target;
}

所以,如果 T 是 Unpin的,您就可以安全地创建一个 Pin<&mut T>……那什么是 Unpin 呢?

pub auto trait Unpin {}

/// 一个没有实现 `Unpin` 的标记类型
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}

看上去除了 PhantomPinned, 其他所有类型默认自动实现了 Unpin。所以除了 PhantomPinned之外,创建 Pin 实例是安全的?Pin 通常只是解引用到 T? 这些看起来都没啥用。

不过这一切的背后有个关键点,它跟我们之前遇到的问题有关。还记得我们试图创建一个自引用结构体来保存我们的任务状态,但是它不起作用,所以我们最终不得不用 Arc 来包裹我们的任务状态吗?这有点不完美,实际上我们可以用一点点不安全的代码生成自引用的结构,来避免使用 Arc。

问题在于一般情况下不能随意分发一个自引用的结构体,因为正如我们意识到的那样,移动一个自引用的结构体会破坏其内部引用,是不安全的。

struct SelfReferential {
    counter: u8, // (X)
    state: FutureState 
}

enum FutureState  {
    First { counter_ptr: *mut u8 } // 自引用指针指向 `counter` (X)
    // ...
}
let mut future1 = SelfReferential::new();
future1.poll(/* ... */);

let mut moved = future1; // 移动它
// 不安全! `counter_ptr` 仍然指向 `counter` 在栈上之前的位置。
moved.poll(/* ... */);

该是 Pin 登场的时候了。 如果您确定 T 所在的位置,在它被分配后到被 drop 前,一直保持不变的话,可以创建 Pin<&mut T>, 意思是任何自引用都是有效的。

对大多数类型来说,Pin 没有任何意义,这就是 Unpin 存在的原因。Unpin实际上是告诉 Pin,该类型没有自引用,所以钉住 (pinning) 它是完全安全、有效的。Pin 甚至会分发可变引用给 Unpin 类型,以便使用 mem::swap 或 mem::replace 移动它们。由于无法安全地创建自引用结构体,Unpin 是默认行为,并由类型自动实现。

然而,如果确实想要创建自引用的 future,可以使用 PhantomPinned 标记结构体将其标记为 !Unpin。钉住 !Unpin 类型需要使用 unsafe,因此由于 poll 需要 Pin<&mut Self>,它在自引用的 future 上无法安全地被调用。

let mut future = SelfReferential::new();

// SAFETY: 该 `future` 不会被移动
let pinned = unsafe { Pin::new_unchecked(&mut future) };
pinned.poll(/* ... /*);

注意:您可以在钉住之前随意移动 future,因为只有在 poll 被调用时自引用结构才会被创建。不过一旦钉住了它,就必须像 Pin 规定的那样保证它不再移动。

不过,有几种安全的方法可以创建一个 Pin,即使是 !Unpin 类型。

首先是 Box::pin:

let mut future1: Pin<Box<SelfReferential>> = Box::pin(SelfReferential::new());
future1.as_mut().poll(/* ... */);

let mut moved = future1;
moved.as_mut().poll(/* ... */);

乍一看,这似乎不太安全,但别忘了,Box 是在堆上分配。一旦分配了 future,它在堆上就有一个固定不变的位置,因此可以随心所欲地移动 Box 指针,内部引用将保持不变。

第二种安全的方式是使用 pin! 宏:

use std::pin::pin;

let mut future1: Pin<&mut SelfReferential> = pin!(SelfReferential::new());
future1.as_mut().poll(/* ... */);

使用 pin! 宏,就算结构体还没有实际分配内存空间,也可以安全地钉住它。原因是它获取了 future 的所有权,除了使用 Pin<&mut T>,没有其他方式可以访问到 future。记住,如果 T 不是 Unpin 的,它永远不会给你一个可变的引用。T是完全隐藏的,因此不会被篡改。

Pin 是导致 future 难以理解的一个混淆点,但是一旦了解了它存在的原理,解决方案就显得非常巧妙。

async/await

以下是标准的 Future 特性:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

那么该怎么使用它呢?

future crate 文档里可以找到很多有用的帮助。像我们之前写的 poll_fn 函数,以及像 map 和 and_then 这样的组合子,我们称之为 chain。

use futures::{FutureExt, future::{ready, poll_fn}};

let future = poll_fn(|_| Poll::Ready(1))
    .and_then(|x| poll_fn(|_| Poll::Ready(x + 1)))
    .and_then(|x| poll_fn(|_| println!("{x}")));

但即使有了这些帮助,我们发现,编写异步代码仍然有点麻烦,需要改变习惯的简单同步代码编程习惯。也许变化不像手动 Epoll 事件循环那样剧烈,但仍然是一个很大的变化。

在Rust中,其实还有一种写异步代码的方式,叫做 async/await 语法。

不使用 poll_fn 来创建 future,而是添加 async 关键字到 fn 的前面:

async fn foo() -> usize {
    1
}

这样的函数叫做异步函数,它跟普通函数相比,不同的仅仅是异步函数返回的是异步代码块:

fn foo() -> impl Future<Output = usize> {
    async { 1 }
}

它只是一个返回 poll_fn future 的函数:

fn foo() -> impl Future<Output = usize> {
    poll_fn(|| Poll::Ready(1))
}

神奇之处是 await 关键字。await 在等待另一个 future 完成时返回 Poll:Pending,直到 future 完成。

async fn foo() {
    let one = one().await;
    let two = two().await;
    assert_eq!(one + 1, two);
}

async fn two() -> usize {
    one().await + 1
}

async fn one() -> usize {
    1
}

这种形式的背后,编译器实际上将它们放入了手动状态机,跟我们使用组合子来创建的差不多。

fn foo() -> impl Future<Output = ()> {
    one()
        .and_then(|one| two().and_then(move |two| poll_fn(move |_| Poll::Ready((one, two)))))
        .and_then(|(one, two)| poll_fn(move |_| Poll::Ready(assert_eq!(one, two + 1))))
}

fn two() -> impl Future<Output = usize> {
    one().and_then(|one| poll_fn(move |_| Poll::Ready(one + 1)))
}

fn one() -> impl Future<Output = usize> {
    poll_fn(|_| Poll::Ready(1))
}

正如我们所熟知的,这转化为一个巨大的手动状态机,看起来像这样:

enum FooFuture {
    One(OneFuture),
    Two(usize, TwoFuture),
}

impl Future for FooFuture {
    type Output = ();

    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let FooFuture::One(f) = self {
            match f.poll(cx) {
                Poll::Ready(one) => *self = Self::Two(one, TwoFuture(OneFuture)),
                Poll::Pending => return Poll::Pending,
            }
        }

        if let FooFuture::Two(one, f) = self {
            match f.poll(cx) {
                Poll::Ready(two) => {
                    assert_eq!(*one + 1, two);
                    return Poll::Ready(());
                }
                Poll::Pending => return Poll::Pending,
            }
        }

        None
    }
}

struct TwoFuture(OneFuture);

impl Future for TwoFuture {
    type Output = usize;

    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.0.poll(waker) {
            Poll::Ready(one) => Poll::Ready(one + 1),
            Poll::Pending => Poll::Pending,
        }
    }
}

struct OneFuture;

impl Future for OneFuture {
    type Output = usize;

    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(1)
    }
}

…… async/await 解决了所有令我们感到头疼的问题。当然,因为我们这里并没有 I/O 操作,以至于看起来 future 没什么用处,但可以想象它对我们的 Web 服务端有多大的帮助。

实际上,它比组合子的方式更优秀。 async 函数中,你甚至可以跨越 await 保存本地状态!

async fn foo() {
    let x = vec![1, 2, 3];
    bar(&x).await;
    baz(&x).await;
    println!("{x:?}");
}

在实现了自己的 futures 之后,我们真的能够体会到这种便利。编译器甚至可以在底层生成一个自引用的 future,以使 bar 和 baz 都能够访问状态。

struct FooFuture {
    x: Vec<i32>, // (X)
    state: FooFutureState,
}

enum FooFutureState {
    Bar(BarFuture),
    Baz(BazFuture),
}

struct BarFuture { x: *mut Vec<i32> /* 指向 (X)! */ }
struct BazFuture { x: *mut Vec<i32> /* 指向 (X)! */ }

编译器会处理与此相关的所有不安全代码,使我们能够像在常规函数中一样处理本地状态。因此,由 async 块或函数生成的 futures 是 !Unpin 的。

与同步代码相比,async/await 消除了编写 futures 时的复杂性。在手动实现 futures 后,几乎感觉像是魔法一样!

Tokio 实现的 Web 服务

到目前为止,我们只是分析了 Future 是如何工作的,还没有讨论如何实际运行它,或进行任何 I/O 操作。事实上,标准库并没有提供这方面的支持,它只提供了最基本的类型和特性供入门使用。

要真正编写一个异步应用程序,必须使用外部运行时。Rust 中目前最流行的运行时是 tokio。它提供了任务调度器、反应器和用于运行阻塞任务的池,就像我们之前编写的一样,而且它还提供了定时器、异步通道和各种其他对异步代码有用的类型和工具。除此之外,tokio 是多线程的,可以将异步任务分配到所有 CPU 核心以充分利用它们。tokio 的核心思想与我们自己编写的异步运行时非常相似,可以在这篇出色的博文中更多地了解 tokio 的设计。

现在是时候编写我们最终的 Web 服务端了,这次使用标准的 Future trait 和 tokio。

Tokio 应用程序始于 #[tokio::main] 宏。在底层,这个宏会启动运行时并在主函数中运行异步代码。

#[tokio::main]
async fn main() {
    // ...
}

Tokio 重构了标准库中大部分类型的异步版本。比如,tokio::net::TcpListener 像 std::net::TcpListener 一样优秀,只是它支持异步。任何与 epoll 或反应器交互的细节都被隐藏起来了。

use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("localhost:3000").await.unwrap();

    loop {
        let (connection, _) = listener.accept().await.unwrap();

        if let Err(e) = handle_connection(connection).await {
            println!("failed to handle connection: {e}")
        }
    }
}

async fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    // ...
}

但这并不完全正确,我们需要创建任务处理程序。我们可以使用 tokio::spawn 函数来实现这一点,它需要一个 future。

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("localhost:3000").await.unwrap();

    loop {
        let (connection, _) = listener.accept().await.unwrap();

        tokio::spawn(async move { // 👈
            if let Err(e) = handle_connection(connection).await {
                println!("failed to handle connection: {e}")
            }
        });
    }
}

现在来创建任务处理程序。使用 AsyncReadExt 特性和 await,我们可以用跟同步读写几乎相同的方式来从 TCP 流读取数据。

use tokio::io::AsyncRead;

async fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    let mut read = 0;
    let mut request = [0u8; 1024];

    loop {
        // 尝试从流中读取数据
        let num_bytes = connection.read(&mut request[read..]).await?; // 👈

        // 客户端已断开
        if num_bytes == 0 {
            println!("client disconnected unexpectedly");
            return Ok(());
        }

        // 跟踪已读取的字节数
        read += num_bytes;

        // 任务已执行完毕?
        if request.get(read - 4..read) == Some(b"\r\n\r\n") {
            break;
        }
    }

    let request = String::from_utf8_lossy(&request[..read]);
    println!("{request}");

    // ...
}

写操作也是一样:

async fn handle_connection(mut connection: TcpStream) -> io::Result<()> {
    // ...

    // "Hello World!" in HTTP
    let response = /* ... */;
    let mut written = 0;

    loop {
        // 写入剩下的内容
        let num_bytes = connection.write(response[written..].as_bytes()).await?; // 👈

        // 客户端已断开
        if num_bytes == 0 {
            println!("client disconnected unexpectedly");
            return Ok(());
        }

        written += num_bytes;

        // 全部内容都写入完毕了?
        if written == response.len() {
            break;
        }
    }

    connection.flush().await
}

非常简单。

如果你有留意,会发现我们的程序除了 async/await 关键字外,跟同步的代码非常相似。有了async/await,我们确实能够轻松应对各种情况。

现在要实现优雅的关闭。

第一步是识别 ctrl+c 信号。在 tokio 中,这仅仅需要使用 tokio::signal::ctrl_c 函数就行。它是一个异步函数,一旦接收到 ctrl+c 信号就会返回。我们还可以使用 tokio 的 select! 宏,这是我们之前实现的select 组合子的更强大版本。

pub async fn main() {
    let listener = TcpListener::bind("localhost:3000").await.unwrap();
    let state = Arc::new((AtomicUsize::new(0), Notify::new()));

    loop {
        select! {
            // 有新的接入请求
            result = listener.accept() => {
                let (connection, _) = result.unwrap();

                tokio::spawn(async move {
                    // ..
                });
            }
            // ctrl+c 信号处理
            shutdown = ctrl_c() => {
                let timer = /* ... */;
                let request_counter = /* .. */;

                select! {
                    _ = timer => {}
                    _ = request_counter => {}
                }

                println!("Gracefully shutting down.");
                return;
            }
        }
    }
}

select! 宏执行最先完成的 future,其他未完成的则被放弃。通过它,我们可以在 ctrl+c 信号和传入连接之间选择,并执行相应的代码。

接下来需要创建优雅关闭的条件。

对定时器而言,我们可以使用 tokio 的异步 sleep 功能。在底层,这会连接到一个自定义的定时器系统,这是比我们 spawn_blocking 定时器的更高效版本。您可以在另一篇出色的文章中详细了解其工作原理。

select! {
    // 有新的接入请求
    result = listener.accept() => {
        // ...
    }
    // ctrl\+c 信号处理
    shutdown = ctrl_c() => {
        let timer = tokio::time::sleep(Duration::from_secs(30));
        let request_counter = /* .. */;

        select! {
            _ = timer => {}
            _ = request_counter => {}
        }

        println!("Gracefully shutting down.");
        return;
    }
}

接下来是活动连接计数器。我们不再手动管理唤醒器,而是使用一个简单的计数器,利用 tokio 的 Notify 类型进行管理,它允许任务相互通知对方或等待被通知。

use tokio::sync::Notify;

let state = Arc::new((AtomicUsize::new(0), Notify::new()));

当新连接建立时,我们增加计数器值;任务完成时则递减。如果计数器值归零,最后的任务会调用 notify_one,它将唤醒主任务,并通知它所有的任务都完成了。

select! {
    result = listener.accept() => {
        let (connection, _) = result.unwrap();
        let state = state.clone();

    // 计数器值增加
    state.0.fetch_add(1, Ordering::Relaxed);

    tokio::spawn(async move {
        if let Err(e) = handle_connection(connection).await {
            // ...
        }

        // 计数器值减少
        let count = state.0.fetch_sub(1, Ordering::Relaxed);
        if count == 1 {
            // 当前已是最后一个任务
            state.1.notify_one();
        }
    });
    }

    shutdown = ctrl_c() => {
        // ...
    }
}

关闭处理程序现在可以简单地在计时器和 Notify::notified 之间进行选择,当有人调用 Notify_one 时,表明最后一个活动连接的任务已经完成,它自身也将完成。

select! {
    result = listener.accept() => {
        // ...
    }
    shutdown = ctrl_c() => {
        // a 30 second timer
        let timer = tokio::time::sleep(Duration::from_secs(30));
        // notified by the last active task
        let notification = state.1.notified();

        // if the count isn't zero, we have to wait
        if state.0.load(Ordering::Relaxed) != 0 {
            // wait for either the timer or notification to resolve
            select! {
                _ = timer => {}
                _ = notification => {}
            }
        }

        println!("Gracefully shutting down.");
        return;
    }
}

非常漂亮,不是吗?

使用 tokio 和 async/await 时,我们不需要考虑唤醒器、反应器或任何隐藏起来的细节,所有的构建块都为我们准备好了,我们只是把它们放在一起而已。

后记

哇!这真是一段非常漫长的旅程!

我们从最简单的 Web 服务端开始,尝试了多线程,并且逐步构建了一个基于epoll的自定义异步运行时。所有这些都是为了实现优雅的关闭。

然后,我们回过头用 tokio 实现了仅需几行额外代码的优雅关闭。

希望本文能让你更加欣赏异步 Rust 的强大之处,并且对其内部工作原理有更深入的了解。本文所有代码都可以在 GitHub 上找到。


1

实际上不存在强制刷新网络套接字的方法,对-TcpStream-而言-flush-只是一个空操作,调用它是为了保持对-io::Write-的一致性。

2

实际的调度器远不止描述的这样简单,可以参考这里

3

详细例子在这里

4

在-Linux-中,线程和进程都是“任务”,只是配置不同。

5

从技术上讲,我们的调度器是不公平的。为了避免某些任务长时间得不到执行,公平的调度器不论是否有可以运行的任务,都会定期检查-epoll。