不要混用&匹配并发性
Actor
等待回应:
//what your code may look like now
object Message
val queryActorRef : ActorRef = ???
val responseBody : Future[String] = (queryActorRef ? Message).mapTo[String]
演员
在这种情况下,将保护有限的资源。但是底层的http连接池为您处理资源利用问题。删除Actor中介将允许您单独使用Futures:
val entityTimeout : FiniteDuration = 10.seconds
val responseBodyWithoutAnActor : Future[String] =
http
.singleRequest(HttpRequest(uri = uri))
.flatMap(response => response.entity.toStrict(timeout))
.map(_.data.utf8String)
如果发送给参与者的“消息”有潜在的来源,例如
Iterable
,则可以改用流媒体:
type Message = ???
val messagesSource : Iterable[Message] = ???
val uri : String = ???
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]](uri)
val entityParallelism = 10
Source
.apply(messagesSource)
.via(poolClientFlow)
.mapAsync(entityParallelism)(resp.entity.toStrict(entityTimeout).data.utf8String)
.runForeach { responseBody : String =>
//whatever you do with the bodies
}