我对使用akka http库创建服务器有点迷茫。我需要建立的沟通如下:
-
-
有时,客户机向服务器发送命令,服务器评估/委托该命令并回答客户机
-
从服务器到所有客户端都有持续的广播消息
-
我的服务器需要管理通过websocket连接的多个“会话”
path("socket") {
handleWebSocketMessages(listen())
}
这里是
listen()
方法:
// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()
def listen(): Flow[Message, Message, NotUsed] = {
val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
val outbound: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](16, OverflowStrategy.fail)
Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
offers ::= outboundMat.offer
NotUsed
})
}
def sendText(text: String): Unit = {
for (connection <- offers) connection(TextMessage.Strict(text))
}
通过这种方法,我可以注册多个客户端并使用
sendText(text: String)
方法但是,有一个大问题:在评估特定客户机的命令后,如何只回答它。(见
(*)
另一件让我烦恼的事是
offers
为了详细说明,我基本上需要能够实现如下所示的方法:
def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
val response: TextMessage = handleMessage(m)
answer(response)
}
但是我不知道在我的websocket流中在哪里调用这个方法。