代码之家  ›  专栏  ›  技术社区  ›  David Weber

使用流的内容更改源中的物化值

  •  0
  • David Weber  · 技术社区  · 6 年前

    alpakka提供了一种访问几十个不同数据源的好方法。面向文件的源(如hdfs和ftp源)的传递方式为 Source[ByteString, Future[IOResult] . 但是,通过akka HTTP的HTTP请求作为 Source[ByteString, NotUsed] . 在我的用例中,我希望从HTTP源检索内容 源[字节串,未来[ioresult] 因此,我可以构建一个统一的资源获取器,它可以从多个方案(本例中是HDF、文件、FTP和S3)中工作。

    特别是,我想转换 源[字节串,未使用] 源到 源[字节串,未来[ioresult] 在这里,我可以从传入的字节流计算ioresult。有很多方法,比如 flatMapConcat viaMat 但似乎没有人能够从输入流中提取详细信息(例如读取的字节数)或初始化 IOResult 结构合理。理想情况下,我正在寻找一个具有以下签名的方法,该方法将在流进入时更新ioresult。

      def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
        src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
      }
    
    2 回复  |  直到 6 年前
        1
  •  1
  •   Serhii Shynkarenko    6 年前

    我无法回忆任何现有的功能,这些功能可以开箱即用,但您可以使用 alsoToMat (惊讶的是,在AKKA流文档中找不到它,尽管您可以在源代码文档和Java API)中查看它的流函数。 Sink.fold 积累一些价值并最终给予它。如:

    def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
        source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
    

    问题是 alsoToMat 将输入mat值与中提供的值组合在一起 奥索马特 . 同时,源产生的值不受 奥索马特 :

    def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
      viaMat(alsoToGraph(that))(matF)
    

    调整这个函数返回并不难。 IOResult ,根据源代码:

    final case class IOResult(count: Long, status: Try[Done]) { ... }
    

    你还需要注意的最后一件事是,你希望你的信息来源是这样的:

    Source[ByteString, Future[IOResult]]
    

    但是,如果您不把这些mat值带到流定义的最后,然后根据这个未来的完成情况执行smth,这可能是一种容易出错的方法。例如,在本例中,我基于该未来完成工作,因此最后一个值将不会被处理:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Keep, Sink, Source}
    
    import scala.concurrent.duration._
    import scala.concurrent.{Await, ExecutionContext, Future}
    
    object App extends App {
    
      private implicit val sys: ActorSystem = ActorSystem()
      private implicit val mat: ActorMaterializer = ActorMaterializer()
      private implicit val ec: ExecutionContext = sys.dispatcher
    
      val source: Source[Int, Any] = Source((1 to 5).toList)
    
      def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
        source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
    
      val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
      f.onComplete(t => println(s"f1 completed - $t"))
      Await.ready(f, 5.minutes)
    
    
      mat.shutdown()
      sys.terminate()
    }
    
        2
  •  0
  •   dvim    6 年前

    这可以通过使用 Promise 对于物化价值传播。

    val completion = Promise[IoResult]
    val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)
    

    现在剩下的就是完成 completion 承诺相关数据可用时。

    另一种方法是 GraphStage 在API中,您可以对物化值传播进行较低级别的控制。但即使在那里也用 Promises 通常是为物化价值传播选择的实现。看看内置的运算符实现,比如 Ignore .