我正在阅读关于akka stream的文档 KillSwitch 他们用一个例子来说明 KillSwitch.shutdown 方法:
KillSwitch.shutdown
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure) val lastSnk = Sink.last[Int] val (killSwitch, last) = countingSrc .viaMat(KillSwitches.single)(Keep.right) .toMat(lastSnk)(Keep.both) .run() doSomethingElse() killSwitch.shutdown() Await.result(last, 1.second) shouldBe 2
我无法理解为什么预期结果应该是2。 正如我看到的这个例子,流被设置为1秒延迟。暂停时 shutdown() 调用,以便kill开关在延迟完成之前通知流关闭。我不明白为什么流的前2个元素会被释放并传递到接收器。
shutdown()
你能帮我解释一下吗?
注意:如果运行此示例,我会得到预期的以下异常:
Exception in thread "main" java.util.NoSuchElementException: last of empty stream at akka.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:181)
对示例代码有误解。结果完全取决于 doSomethingElse . 除非花费的时间太少,否则会出现异常。要测试这一点,可以将其替换为 Thread.sleep(2000) 您将从 Sink . 如果增加睡眠值,结果也会增加。
doSomethingElse
Thread.sleep(2000)
Sink
关于您在评论中提出的问题:
delay 按规定量及时移动元素排放量。延迟精度为10ms,以避免不必要的计时器调度周期。这就是您看到这种行为的原因(您可以在的Scala文档中查看这些详细信息 Flow ).
delay
Flow
如果您想每秒发送一条消息,请尝试 throttle 而是:
throttle
.throttle(1, 1.second, 1, ThrottleMode.shaping)