代码之家  ›  专栏  ›  技术社区  ›  opensourcegeek

如何从流本身中取消无限流?

  •  6
  • opensourcegeek  · 技术社区  · 7 年前

    我想取消一次间隔( interval_timer )清空队列后,但不确定什么是正确的策略。

    let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let interval_timer = tokio_timer::Timer::default();
    
    let timer = interval_timer
        .interval(Duration::from_millis(1000))
        .map_err(|_| {
            println!("Errored out");
        });
    
    let s = timer.for_each(move |_| {
        println!("Woke up");
        let item = some_vars.pop().unwrap();
    
        let f = futures::future::ok(item).map(|x| {
            println!("{:?}", x);
        });
        tokio::spawn(f)
    });
    
    tokio::run(s);
    

    我试过了 drop 正如gitter中所建议的,但最终出现了一个错误:

    let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let mut interval_timer = tokio_timer::Timer::default();
    
    let timer = interval_timer
        .interval(Duration::from_millis(1000))
        .map_err(|_| {
            println!("Errored out");
        });
    
    let s = timer.for_each(move |_| {
        println!("Woke up");
        if some_vars.len() == 1 {
            drop(interval_timer);
        }
    
        let item = some_vars.pop().unwrap();
    
        let f = futures::future::ok(item).map(|x| {
            println!("{:?}", x);
        });
        tokio::spawn(f)
    });
    
    tokio::run(s);
    

    错误:

    error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
    --> src/main.rs:72:22
       |
    60 |     let mut interval_timer = tokio_timer::Timer::default();
       |         ------------------ captured outer variable
    ...
    72 |                 drop(interval_timer);
       |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
    
    3 回复  |  直到 6 年前
        1
  •  3
  •   Shepmaster Tim Diekmann    6 年前

    对于要从中取消流的情况 外部 请参阅 stream-cancel


    对于您的特定情况,最简单的方法是将集合转换为流,并使用间隔计时器将其压缩在一起。这样,当集合为空时,生成的流自然停止:

    use futures::{future, stream, Stream}; // 0.1.29
    use std::time::Duration;
    use tokio; // 0.1.22
    use tokio_timer::Interval; // 0.2.11
    
    fn main() {
        tokio::run({
            let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
    
            let timer =
                Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
    
            let some_vars = stream::iter_ok(some_vars.into_iter().rev());
            let combined = timer.zip(some_vars);
    
            combined.for_each(move |(_, item)| {
                eprintln!("Woke up");
    
                tokio::spawn(future::lazy(move || {
                    println!("{:?}", item);
                    Ok(())
                }));
    
                Ok(())
            })
        });
    }
    

    否则,可以使用 and_then 要从集合中删除值并控制流是否应继续,请执行以下操作:

    use futures::{future, Stream}; // 0.1.29
    use std::time::Duration;
    use tokio; // 0.1.22
    use tokio_timer::Interval; // 0.2.11
    
    fn main() {
        tokio::run({
            let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
    
            let timer =
                Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
    
            let limited = timer.and_then(move |_| {
                if some_vars.len() <= 4 {
                    Err(())
                } else {
                    some_vars.pop().ok_or(())
                }
            });
    
            limited.for_each(move |item| {
                eprintln!("Woke up");
    
                tokio::spawn(future::lazy(move || {
                    println!("{:?}", item);
                    Ok(())
                }));
    
                Ok(())
            })
        });
    }
    
        2
  •  0
  •   teknopaul    6 年前

    我复制了一份Tokio的 Interval 结构,添加对应用程序方法的引用,以指示何时提前中断。

    就我而言,我想打断 间隔 关闭。

    我的间隔轮询方法如下所示:

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.session.read().unwrap().shutdown {
            return Ok(Async::Ready(Some(Instant::now())));
        }
    
        // Wait for the delay to be done
        let _ = match self.delay.poll() {
    

    然后,您需要控制任务(调用 task = futures::task::current() 在超时任务内运行时)。

    在任何时候你都可以打电话 task.notify() 将间歇时间付诸行动,并输入中断代码,中断 间隔 早期的

    在…内 间隔 有一个 Delay 可以修改的结构,您可以创建 间隔 您可以中断并更改超时,这样您可以中断一次,然后继续。

        3
  •  -2
  •   wolandr    7 年前

    tokio_timer::Interval 机具 futures::Stream ,请尝试使用 take_while 方法:

    let s = timer
        .take_while(|()| 
            future::ok(is_net_completed()))
        .for_each(move |_| {
            println!("Woke up");
            // ...
        })