代码之家  ›  专栏  ›  技术社区  ›  Doug Anderson

Akka Streams scala DSL和Op Rabbit

  •  0
  • Doug Anderson  · 技术社区  · 8 年前

    我需要基于谓词分割流,然后将它们组合起来,就像我在创建图和使用分区和合并时所做的那样。

                            | --> flow1 --> |
    source--> partition --> |               | --> flow3 --> sink
                            | --> flow2 --> |
    

    我不确定splitWhen是否是我应该使用的,因为我总是需要正好2个流。

    def splitExample(source: AckedSource[String, SubscriptionRef],
                     queueName: String)
                    (implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
      val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
        .map[AckTup[String]](tup => {
          val (p,m) = tup
          (p, new String(m.data))
        })
    
      val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
        .map[AckTup[String]](tup => {
          val (p, s) = tup
          println(s"flow1 processing $s")
          tup
         })
    
      val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
        .map[AckTup[String]](tup => {
          val (p, s) = tup
          println(s"flow2 processing $s")
          tup
        })
    
      source
        .map(Message.queue(_, queueName))
        .via(AckedFlow(toStringFlow))
        // partition if string.length < 10
        .via(AckedFlow(printFlow1))
        .via(AckedFlow(printFlow2))
        .to(AckedSink.ack)
    }
    

    这是我似乎无法使用的代码:

    import GraphDSL.Implicits._
    def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
        import GraphDSL.Implicits._
        GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
        import GraphDSL.Implicits._
        source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
    //      source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
        ClosedShape
    

    }}

    编译器无法解析 ~>

    所以我的问题是:

    2 回复  |  直到 8 年前
        1
  •  1
  •   Stefano Bonetti    8 年前

    acked-stream .

    1. AckedSource[Out, Mat] Source[AckTup[Out], Mat]]
    2. AckedFlow[In, Out, Mat] Flow[AckTup[In], AckTup[Out], Mat]
    3. AckedSink[In, Mat] 是的包装 Sink[AckTup[In], Mat]
    4. AckTup[T] 是的别名 (Promise[Unit], T)
    5. T 部分 AckTup
    6. 这个 .acked Promise[Unit] AckedFlow

    ~> GraphDSL.Implicits

    你有两条出路:

    1. ~> 图形含义
    2. 您可以展开打包的阶段以获得标准阶段。您可以使用 .wrappedRepr AckedSource , AckedSink .
        2
  •  0
  •   Doug Anderson    8 年前

    graph:    
                            |--> short --|
      rabbitMq --> before --|            |--> after
                            |--> long  --|
    

    val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
      val (p,m) = tup
      (p, new String(m.data))
    })
    
    val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"short: $s")
      tup
    })
    val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"long: $s")
      tup
    })
    val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"all $s")
      tup
    })
    
    def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
                        , queueName: String
                        , splitLength: Int)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
     GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
       val toShort = 0
       val toLong = 1
    
       // junctions
       val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
                                                               val (p, s) = tup
                                                               if (s.length < splitLength) toShort else toLong
                                                             }
       ))
       val merge = builder.add(Merge[AckTup[String]](2))
    
       //graph
       val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
       beforeSplit ~> split
       // must do short, then long since the split goes in that order
       split ~> AckedFlow(short).wrappedRepr ~> merge
       split ~> AckedFlow(long).wrappedRepr ~> merge
       // after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
       merge ~> AckedFlow(after).acked ~> s
    
      ClosedShape
    }}
    

    正如Stefano Bonetti所说,关键是使用 .wrappedRepr AckedFlow 然后使用 .acked 最后一步是组合器。