代码之家  ›  专栏  ›  技术社区  ›  dspyz

如何将等待“tokio::sync::watch::Receiver”变为“Some”?

  •  0
  • dspyz  · 技术社区  · 4 月前

    我想写一篇 async 功能 recv_some 这需要一个 watch::Receiver<Option<T>> ,等待值为 Some ,并返回以下内容 Deref T 。出于我所能理解的原因,当我以显而易见的方式这样做时,借阅检查器会抱怨。我似乎也无法解决这个问题。

    use std::ops::Deref;
    
    use tokio::sync::watch;
    
    pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);
    
    impl<T> Deref for WatchSomeRef<'_, T> {
        type Target = T;
    
        fn deref(&self) -> &Self::Target {
            self.0.as_ref().unwrap()
        }
    }
    
    pub async fn recv_some<T>(
        rx: &mut watch::Receiver<Option<T>>,
    ) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
        loop {
            {
                let guard = rx.borrow_and_update();
                if guard.is_some() {
                    return Ok(WatchSomeRef(guard));
                }
            }
            rx.changed().await?;
        }
    }
    

    当我这样做时,我得到了错误:

    error[E0499]: cannot borrow `*rx` as mutable more than once at a time
      --> src/lib.rs:20:25
       |
    16 |     rx: &mut watch::Receiver<Option<T>>,
       |         - let's call the lifetime of this reference `'1`
    ...
    20 |             let guard = rx.borrow_and_update();
       |                         ^^ `*rx` was mutably borrowed here in the previous iteration of the loop
    21 |             if guard.is_some() {
    22 |                 return Ok(WatchSomeRef(guard));
       |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`
    
    error[E0499]: cannot borrow `*rx` as mutable more than once at a time
      --> src/lib.rs:25:9
       |
    16 |     rx: &mut watch::Receiver<Option<T>>,
       |         - let's call the lifetime of this reference `'1`
    ...
    20 |             let guard = rx.borrow_and_update();
       |                         -- first mutable borrow occurs here
    21 |             if guard.is_some() {
    22 |                 return Ok(WatchSomeRef(guard));
       |                        ----------------------- returning this value requires that `*rx` is borrowed for `'1`
    ...
    25 |         rx.changed().await?;
       |         ^^ second mutable borrow occurs here
    

    ETA:

    我认为这是因为我们想要保护的寿命取决于潜在价值。如果是 一些 ,它需要比函数调用更持久,但如果 None 它需要马上扔掉,这样我们才能等待下一次更改。

    不过,我仍然不知道如何解决这个问题(除了使用不安全的mem::transmute来改变生命周期)。

    1 回复  |  直到 4 月前
        1
  •  5
  •   Finomnis    4 月前

    这似乎是借用检查器的误报,可能与所讨论的已知局限性密切相关 here .

    如果你使用 watch::Receiver::wait_for ,它编译得很好:

    use std::ops::Deref;
    
    use tokio::sync::watch;
    
    pub struct WatchSomeRef<'a, T>(watch::Ref<'a, Option<T>>);
    
    impl<T> Deref for WatchSomeRef<'_, T> {
        type Target = T;
    
        fn deref(&self) -> &Self::Target {
            self.0.as_ref().unwrap()
        }
    }
    
    pub async fn recv_some<T>(
        rx: &mut watch::Receiver<Option<T>>,
    ) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
        let value = rx.wait_for(|val| val.is_some()).await?;
        Ok(WatchSomeRef(value))
    }
    

    或者更紧凑的版本:

    pub async fn recv_some<T>(
        rx: &mut watch::Receiver<Option<T>>,
    ) -> Result<WatchSomeRef<T>, watch::error::RecvError> {
        rx.wait_for(Option::is_some).await.map(WatchSomeRef)
    }