代码之家  ›  专栏  ›  技术社区  ›  Florian Baierl

使用akka http回答特定客户机,并支持广播

  •  0
  • Florian Baierl  · 技术社区  · 7 年前

    我对使用akka http库创建服务器有点迷茫。我需要建立的沟通如下:

    • 有时,客户机向服务器发送命令,服务器评估/委托该命令并回答客户机
    • 从服务器到所有客户端都有持续的广播消息

    • 我的服务器需要管理通过websocket连接的多个“会话”

    path("socket") {
      handleWebSocketMessages(listen())
    }
    

    这里是 listen() 方法:

    // stores offers to broadcast to all clients
    private var offers: List[TextMessage => Unit] = List()
    
    def listen(): Flow[Message, Message, NotUsed] = {
      val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
      val outbound: Source[Message, SourceQueueWithComplete[Message]] =
        Source.queue[Message](16, OverflowStrategy.fail)
    
      Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
        offers ::= outboundMat.offer
        NotUsed
      })
    }
    
    def sendText(text: String): Unit = {
      for (connection <- offers) connection(TextMessage.Strict(text))
    }
    

    通过这种方法,我可以注册多个客户端并使用 sendText(text: String) 方法但是,有一个大问题:在评估特定客户机的命令后,如何只回答它。(见 (*)

    另一件让我烦恼的事是 offers

    为了详细说明,我基本上需要能够实现如下所示的方法:

    def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
      val response: TextMessage = handleMessage(m)
      answer(response)
    }
    

    但是我不知道在我的websocket流中在哪里调用这个方法。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Florian Baierl    7 年前

    var actors: List[ActorRef] = Nil
    
    private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
        val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
          .toMat(BroadcastHub.sink[String])(Keep.both)
          .run()
    
        actors = actor :: actors
    
        val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
          Flow[ws.Message]
            .merge(source)
            .map {
              case TextMessage.Strict(tm) => handleMessage(actor, tm)
              case _ => TextMessage.Strict("Ignored message!")
            }
        wsHandler
      }
    
      def broadcast(msg: String): Unit = {
        actors.foreach(_ ! TextMessage.Strict(msg))
      }
    
    推荐文章