代码之家  ›  专栏  ›  技术社区  ›  Some Name

流observeAsync不异步执行给定的接收器

  •  0
  • Some Name  · 技术社区  · 7 年前

    我在试验 fs2.Stream 并发特性,并对其工作原理产生了一些误解。我想并行地通过某个接收器发送流内容。我试过的是:

    object TestParallelStream extends App {
      val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
      val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
      val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO {
        println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
        Thread.sleep(5000)
      })
      val executor = Executors.newFixedThreadPool(4)
      implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))
    
    
      stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
      executor.shutdown()
    }
    

    这个 //1 打印以下内容:

    [1 second]: 1
    [6 second]: 2
    [11 second]: 3
    [16 second]: 4
    [21 second]: 5
    [26 second]: 6
    [31 second]: 7
    [36 second]: 8
    [41 second]: 9
    

    从输出可以看出,每个元素都通过 sink 按顺序。

    但如果我修改水槽如下:

    // 5 limit and parEvalMap
    val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO { 
      println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
      Thread.sleep(5000)
    })
    

    输出为:

    [1 second]: 3
    [1 second]: 2
    [1 second]: 4
    [1 second]: 1
    [6 second]: 5
    [6 second]: 6
    [6 second]: 7
    [6 second]: 8
    [11 second]: 9
    

    现在我们有4个元素一次并行地通过接收器发送(尽管设置了 3 作为限制 observerAsync )中。

    即使我换了 observer异步 只是 observe 我得到了同样的并行化效果。

    你能解释一下水槽是怎么工作的吗?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Daenyth    7 年前

    observe 当要传递流元素时使用 倍数 下沉。它不会改变接收器本身的并发行为。

    你会这样使用它:

    stream.observeAsync(n)(sink1).to(sink2)