代码之家  ›  专栏  ›  技术社区  ›  financial_physician

Rust中的异步操作何时可以退出?

  •  1
  • financial_physician  · 技术社区  · 2 年前

    上下文

    我正在通读 this blog .提供了一些代码:

    async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
        let len = socket.read_u32().await?;
        let mut line = vec![0; len];
        socket.read_exact(&mut line).await?;
        let line = str::from_utf8(line)?;
        Ok(line)
    }
    
    loop {
        select! {
            line_in = parse_line(&socket) => {
                if let Some(line_in) = line_in {
                    broadcast_line(line_in);
                } else {
                    // connection closed, exit loop
                    break;
                }
            }
            line_out = channel.recv() => {
                write_line(&socket, line_out).await;
            }
        }
    }
    

    作者声称 parse_line 如果 channel 在parse_line执行时接收消息。

    问题

    在什么时候可以 parse_line 被打断?是在吗 任何 指向根据我目前的理解——这可能是错误的——Rust可以在等待语句中切换线程上的任务,但在这些时刻,状态会被存储起来,以便恢复工作。

    我的想象

    我在想象 parse_line ,Rust正在将字节加载到 line 变量在读取一定数量的字节(并且可能只有某个ASCII字符的一半字节)之后,并且在等待更多字节进入的同时, 频道 接收到一些东西,上下文切换。

    完成后 channel.recv() 任务,Rust返回读取输入,但是提供字节的用户取消了请求,现在没有其他内容可读取。

    现在 str::from_utf8(line)? 引发UTF-8错误,因为 线 具有不完整的ASCII字符。

    1 回复  |  直到 2 年前
        1
  •  4
  •   Chayim Friedman    2 年前

    TL;博士: 不在 任何 点,仅在 .await s


    异步代码被降低到实现的状态机中 Future . 等候 电话 Future::poll() 在循环中,当调用程序返回时挂起它 Poll::Pending 并完成 Poll::Ready 。这基本上是在运行内在的未来直到完成。

    但是,如果我们调用 poll() 几次就停下来不完成未来?在这种情况下,我们调用的几次 投票 已经将未来推进到了执行过程中的某个阶段,我们到此为止。我们有 取消 未来。

    关键的观察点是这个执行点必须在内部 等候 。这是因为同步,没有- 等候 将来的代码转换为中的直接同步代码 投票 实施,我们不能就此止步。我们只能在回来后停下来 投票 ,而这种情况要么发生在 等候 或在完成时。

    然而,并不是所有的期货都是 取消保险箱 。有些,比如你的 parse_line() ,取消时会失去工作。如果我们在第二个取消 等候 ,长度已经从套接字中读取(并丢弃),但主体尚未读取,因此我们丢失了该长度。我们无法恢复它,下次调用该函数时,它将看到套接字中损坏的数据(或者只跳过一条记录)。

    select! 取消除第一个已完成的期货以外的所有期货,因此此代码有一个错误。

    解决办法是永远不要失去未来,但要把它留到以后:

    let mut parse_line_fut = std::pin::pin!(parse_line(&socket));
    loop {
        select! {
            line_in = parse_line_fut.as_mut() => {
                if let Ok(line_in) = line_in {
                    broadcast_line(line_in);
    
                    parse_line_fut.set(parse_line(&socket));
                } else {
                    // connection closed, exit loop
                    break;
                }
            }
            line_out = channel.recv() => {
                write_line(&socket, line_out).await;
            }
        }
    }