我有一个非常简单的akka-websocket服务器,它将文件中的行推送到连接的客户机,每行间隔为400毫秒。除了Web服务器似乎在广播消息之前缓冲了大约一分钟之外,一切都可以正常工作。
因此,当一个客户机连接时,我在服务器端看到每400毫秒一行被读取并推送到
Sink
但是在客户机端,我一分钟内什么也没有得到,然后大约150条消息(相当于一分钟的消息)。
有我忽略的场景吗?
object WebsocketServer extends App {
implicit val actorSystem = ActorSystem("WebsocketServer")
implicit val materializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
val fileSource =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
val delayedSource: Source[Strict, Future[IOResult]] =
fileSource
.map { line =>
Thread.sleep(400)
println(line.utf8String)
TextMessage(line.utf8String)
}
def route = path("") {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(
Sink.ignore,
delayedSource)
)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
bindingFuture.onComplete {
case Success(binding) â
println(s"Server is listening on ws://localhost:8080")
case Failure(e) â
println(s"Binding failed with ${e.getMessage}")
actorSystem.terminate()
}
}