代码之家  ›  专栏  ›  技术社区  ›  erip Jigar Trivedi

如何合并Akka流中任意数量的源?

  •  0
  • erip Jigar Trivedi  · 技术社区  · 7 年前

    n GraphMergePrioritiziedSpec Source

    import akka.NotUsed
    import akka.stream.{ClosedShape, Graph, Materializer}
    import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
    import org.apache.activemq.ActiveMQConnectionFactory
    
    class SourceMerger(
      sources: Seq[Source[java.io.Serializable, NotUsed]],
      priorities: Seq[Int],
      private val sink: Sink[java.io.Serializable, _]
    ) {
    
      require(sources.size == priorities.size, "Each source should have a priority")
    
      import GraphDSL.Implicits._
    
      private def partial(
        sources: Seq[Source[java.io.Serializable, NotUsed]],
        priorities: Seq[Int],
        sink: Sink[java.io.Serializable, _]
      ): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>
    
          val merge = b.add(MergePrioritized[java.io.Serializable](priorities))
    
          sources.zipWithIndex.foreach { case (s, i) =>
            s.shape.out ~> merge.in(i)
          }
    
          merge.out ~> sink
          ClosedShape
      }
    
      def merge(
        sources: Seq[Source[java.io.Serializable, NotUsed]],
        priorities: Seq[Int],
        sink: Sink[java.io.Serializable, _]
      ): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))
    
      def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
    }
    

    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, Materializer}
    import akka.stream.scaladsl.{Sink, Source}
    import org.scalatest.{Matchers, WordSpecLike}
    import akka.testkit.TestKit
    
    import scala.collection.immutable.Iterable
    
    class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {
    
      implicit val materializer: Materializer = ActorMaterializer()
    
      "A SourceMerger" should {
        "merge by priority" in {
    
          val priorities: Seq[Int] = Seq(1,2,3)
    
          val highPriority = Iterable("message1", "message2", "message3")
          val mediumPriority = Iterable("message4", "message5", "message6")
          val lowPriority = Iterable("message7", "message8", "message9")
    
          val source1 = Source[String](highPriority)
          val source2 = Source[String](mediumPriority)
          val source3 = Source[String](lowPriority)
    
          val sources = Seq(source1, source2, source3)
    
          val subscriber = Sink.seq[java.io.Serializable]
    
          val merger = new SourceMerger(sources, priorities, subscriber)
    
          merger.run()
    
          source1.runWith(Sink.foreach(println))
        }
      }
    
    }
    

    [StatefulMapConcat.out] is already connected
    java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
        at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
        at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
        at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
        at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:26)
        at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:25)
    

    sources.zipWithIndex.foreach { case (s, i) =>
      s.shape.out ~> merge.in(i)
    }
    

    在Akka流图DSL中?如果是这样,为什么我的尝试不成功?

    2 回复  |  直到 7 年前
        1
  •  1
  •   Ramón J Romero y Vigil    7 年前

    代码示例的主要问题

    问题中提供的代码片段的一个大问题是 source1 连接到 Sink 来自 merge 呼叫和 Sink.foreach(println) .相同的 Source 如果没有 intermediate fan-out element .

    移除 接收foreach(println) 可以彻底解决你的问题。

    简化设计

    来源

    private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
                        priorities: Seq[Int],
                        sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] = 
       sources.zip(priorities)
              .sortWith(_._2 < _._2)
              .map(_._1)
              .reduceOption(_ ++ _)
              .getOrElse(Source.empty[java.io.Serializable])
              .to(sink)
    
        2
  •  1
  •   Astrid    7 年前

    如果我替换,您的代码运行时不会出错。

      sources.zipWithIndex.foreach { case (s, i) =>
        s.shape.out ~> merge.in(i)
      }
    

      sources.zipWithIndex.foreach { case (s, i) =>
        s ~> merge.in(i)
      }
    

    s.shape StatefulMapConcat