代码之家  ›  专栏  ›  技术社区  ›  gkatzioura

Akka流/HTTP:从响应获取原始请求

  •  2
  • gkatzioura  · 技术社区  · 7 年前

    我有一个Akka Streams源,它通过一个流并发布一个HTTP请求:

    source.map(toRequest)
      .via(Http().outgoingConnection(host))
      .map(toMessage) 
    

    假设 toRequest 方法将字符串映射到 HttpRequest ,以及 toMessage 方法映射 HttpResponse 到下游处理所需的消息类。假设消息类需要包含一些原始信息。

    有可能拿到原件吗 HttpRequest HttpResponse(高温超导响应) ?如果没有,有没有办法保留一些原始信息?

    2 回复  |  直到 7 年前
        1
  •  3
  •   Jeffrey Chung    7 年前

    一种方法是使用 Future -based variant 客户端API和保存要向下游传播的信息的自定义case类。例如:

    case class Message(request: HttpRequest, response: HttpResponse)
    
    source
      .map(toRequest)
      .mapAsync(parallelism = 3) { request => // adjust the level of parallelism as needed
        Http().singleRequest(request).map(response => Message(request, response))
      }
      // continue processing; at this point you have a Source[Message, _]
    
        2
  •  1
  •   Emiliano Martinez    7 年前

    您可以使用图形api绕过HttpRequest。一个例子是:

    object Main extends App {
    
      implicit val as = ActorSystem("Test")
      implicit val m = ActorMaterializer()
      implicit val ec = as.dispatcher
    
      def src1 = Source.fromIterator(() => List(1, 2, 3).toIterator)
    
      val srcGraph = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._
    
        // Create one flow to prepend the 'Number' string to the input integer
        def flow1 = Flow[Int].map { el => s"Number $el"  }
    
        // Create a broadcast stage 
        val broadcast = builder.add(Broadcast[Int](2))
        val zip       = builder.add(Zip[Int, String]())
    
        src1 ~> broadcast.in
    
        // The 0 port sends the int to the zip stage directly
        broadcast.out(0) ~>          zip.in0
        broadcast.out(1) ~> flow1 ~> zip.in1
    
        SourceShape(zip.out)
      })
    
      Source.fromGraph(srcGraph).runForeach(println(_))
    }
    

    graph api提供了许多选项来执行类似的操作。