代码之家  ›  专栏  ›  技术社区  ›  Tom Lous

如何禁用Akka WebSocket服务器上消息的缓冲?

  •  1
  • Tom Lous  · 技术社区  · 6 年前

    我有一个非常简单的akka-websocket服务器,它将文件中的行推送到连接的客户机,每行间隔为400毫秒。除了Web服务器似乎在广播消息之前缓冲了大约一分钟之外,一切都可以正常工作。

    因此,当一个客户机连接时,我在服务器端看到每400毫秒一行被读取并推送到 Sink 但是在客户机端,我一分钟内什么也没有得到,然后大约150条消息(相当于一分钟的消息)。

    有我忽略的场景吗?

    object WebsocketServer extends App {
      implicit val actorSystem = ActorSystem("WebsocketServer")
      implicit val materializer = ActorMaterializer()
      implicit val executionContext = actorSystem.dispatcher
    
      val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
      val fileSource =
        FileIO.fromPath(file)
          .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
    
      val delayedSource: Source[Strict, Future[IOResult]] =
        fileSource
          .map { line =>
            Thread.sleep(400)
            println(line.utf8String)
            TextMessage(line.utf8String)
          }
    
      def route = path("") {
        extractUpgradeToWebSocket { upgrade =>
          complete(upgrade.handleMessagesWithSinkSource(
            Sink.ignore,
            delayedSource)
          )
        }
      }
    
      val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
    
      bindingFuture.onComplete {
        case Success(binding) ⇒
          println(s"Server is listening on ws://localhost:8080")
        case Failure(e) ⇒
          println(s"Binding failed with ${e.getMessage}")
          actorSystem.terminate()
      }
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Tom Lous    6 年前

    所以这个方法 Thread.sleep(400) 是错误的。我应该用 .throttle 来源机械师:

    val delayedSource: Source[Strict, Future[IOResult]] =
        fileSource
          .throttle(elements = 1, per = 400.millis)
          .map { line =>
            println(line.utf8String)
            TextMessage(line.utf8String)
          }
    

    这解决了问题。