代码之家  ›  专栏  ›  技术社区  ›  Fede E.

Akka HTTP客户端和Akka actor性能调优

  •  0
  • Fede E.  · 技术社区  · 7 年前

    我们从使用Camel HTTP4转移到Akka HTTP,虽然我们现在能够更好地控制错误,但是考虑到Akka HTTP(客户端)中所有可调整的参数,要获得更好的性能变得非常困难。

    我们有一个actor,它接收消息,向外部服务发出httpget请求(可以轻松地管理1500多个RPS),然后用HTTP响应体作为字符串进行响应。

    我们的HTTP请求是通过singleRequest发出的:

    val httpResponseFuture: Future[HttpResponse] = http.singleRequest(HttpRequest(uri = uri))
    
    val tokenizationResponse = for {
      response <- httpResponseFuture
      body <- Unmarshal(response.entity).to[String]
    } yield transformResponse(response.status, body, path, response.headers)
    

    然后是产生最佳结果的设置(查看这些数字并没有显示出任何实际的改进:

    akka {
    
        actor.deployment {
          /HttpClient {
            router = balancing-pool
            nr-of-instances = 7
          }
        }
    
        http {
          host-connection-pool {
            max-connections = 30
            max-retries = 5
            max-open-requests = 8192
            pipelining-limit = 200
            idle-timeout = 30 s
          }
        }
    
    }
    

    我们尝试过调整池的大小、actor实例以及主机连接池下的所有其他参数,但没有更好的结果。

    欢迎任何建议!

    1 回复  |  直到 7 年前
        1
  •  2
  •   Ramón J Romero y Vigil    7 年前

    不要混用&匹配并发性

    Actor 等待回应:

    //what your code may look like now
    
    object Message
    
    val queryActorRef : ActorRef = ???
    
    val responseBody : Future[String] = (queryActorRef ? Message).mapTo[String]
    

    演员 在这种情况下,将保护有限的资源。但是底层的http连接池为您处理资源利用问题。删除Actor中介将允许您单独使用Futures:

    val entityTimeout : FiniteDuration = 10.seconds
    
    val responseBodyWithoutAnActor : Future[String] = 
        http
          .singleRequest(HttpRequest(uri = uri))
          .flatMap(response => response.entity.toStrict(timeout))
          .map(_.data.utf8String)
    

    如果发送给参与者的“消息”有潜在的来源,例如 Iterable ,则可以改用流媒体:

    type Message = ???
    
    val messagesSource : Iterable[Message] = ???
    
    val uri : String = ???
    
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](uri)
    
    val entityParallelism = 10
    
    Source
      .apply(messagesSource)
      .via(poolClientFlow)
      .mapAsync(entityParallelism)(resp.entity.toStrict(entityTimeout).data.utf8String)
      .runForeach { responseBody : String =>
        //whatever you do with the bodies
      }
    
    推荐文章