我无法回忆任何现有的功能,这些功能可以开箱即用,但您可以使用
alsoToMat
(惊讶的是,在AKKA流文档中找不到它,尽管您可以在源代码文档和Java API)中查看它的流函数。
Sink.fold
积累一些价值并最终给予它。如:
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
问题是
alsoToMat
将输入mat值与中提供的值组合在一起
奥索马特
. 同时,源产生的值不受
奥索马特
:
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) â Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF)
调整这个函数返回并不难。
IOResult
,根据源代码:
final case class IOResult(count: Long, status: Try[Done]) { ... }
你还需要注意的最后一件事是,你希望你的信息来源是这样的:
Source[ByteString, Future[IOResult]]
但是,如果您不把这些mat值带到流定义的最后,然后根据这个未来的完成情况执行smth,这可能是一种容易出错的方法。例如,在本例中,我基于该未来完成工作,因此最后一个值将不会被处理:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object App extends App {
private implicit val sys: ActorSystem = ActorSystem()
private implicit val mat: ActorMaterializer = ActorMaterializer()
private implicit val ec: ExecutionContext = sys.dispatcher
val source: Source[Int, Any] = Source((1 to 5).toList)
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
f.onComplete(t => println(s"f1 completed - $t"))
Await.ready(f, 5.minutes)
mat.shutdown()
sys.terminate()
}