代码之家  ›  专栏  ›  技术社区  ›  user4063815

Akka MergeHub和BroadcastHub通过Actor通过websockets支持多个客户端

  •  0
  • user4063815  · 技术社区  · 8 年前

    目前,我使用带有

    • 1个客户

    服务器推出keepAlive消息,并回答来自客户端的请求。

    现在我想让事情变得有趣一点,并增加处理的可能性 客户。

    所以我看了一下:

    https://github.com/playframework/play-scala-chatroom-example

    这基本上是一个 n-Inlet ~> n-Outlet

    我需要的是更复杂一点 作为服务器应该

    1. 仍向发送保留消息 连接的客户端和
    2. 全部的

    所以这基本上只是我抽象思维方式的中间一步。

    type AllowedWSMessage = String
    
    val myActor = system.actorOf(Props{new myActor}, "myActor")
    val myActorSink = Sink.actorRefWithAck(myActor, "init", "acknowledged", "completed")
    import scala.concurrent.duration._
    
    val tickingSource: Source[AllowedWSMessage, Cancellable] =
    Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
      .map(_ => "Staying Alive")
    
    val serverMessageSource = Source
    .queue[AllowedWSMessage](10, OverflowStrategy.backpressure)
    .mapMaterializedValue { queue => myActor ! InitTunnel(queue)}
    
    val serverSource: Source[AllowedWSMessage, Cancellable] = tickingSource.merge(serverMessageSource)
    
    private val (clientSink, clientSource) =
    {
        // Don't log MergeHub$ProducerFailed as error if the client disconnects.
        // recoverWithRetries -1 is essentially "recoverWith"
        val source = MergeHub.source[AllowedWSMessage]
          .log("source")
          .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty})
    
        val sink: Sink[AllowedWSMessage, Source[AllowedWSMessage, NotUsed]] = BroadcastHub.sink[AllowedWSMessage]
        source.via(serverSource).toMat(sink)(Keep.both).run()
      }
    

    (注意 source.via(serverSource)... )

    (Client -> WebSocket ->) MergeHub ~> myActor ~> BroadcastHub (-> WebSocket -> Client)
    

    现在我想知道,做这件事的优雅方式是什么?

    1 回复  |  直到 8 年前
        1
  •  0
  •   user7677469 user7677469    8 年前

    你有你的服务器源和接收器,你说它们已经工作了,所以我不会深入研究它们。

    val fanIn = MergeHub.source[AllowedWSMessage].to(myActorSink).run()
    val fanOut = serverSource.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
    
    // Now, somewhere in a (route-)method where you handle the websocket connection
    
    Flow.fromSinkAndSource(fanIn, fanOut)
    

    这么简单,希望你脑袋里的结现在解开:)

    推荐文章