目前,我使用带有
服务器推出keepAlive消息,并回答来自客户端的请求。
现在我想让事情变得有趣一点,并增加处理的可能性
客户。
所以我看了一下:
https://github.com/playframework/play-scala-chatroom-example
这基本上是一个
n-Inlet ~> n-Outlet
我需要的是更复杂一点
作为服务器应该
-
仍向发送保留消息
连接的客户端和
-
全部的
所以这基本上只是我抽象思维方式的中间一步。
type AllowedWSMessage = String
val myActor = system.actorOf(Props{new myActor}, "myActor")
val myActorSink = Sink.actorRefWithAck(myActor, "init", "acknowledged", "completed")
import scala.concurrent.duration._
val tickingSource: Source[AllowedWSMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => "Staying Alive")
val serverMessageSource = Source
.queue[AllowedWSMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}
val serverSource: Source[AllowedWSMessage, Cancellable] = tickingSource.merge(serverMessageSource)
private val (clientSink, clientSource) =
{
// Don't log MergeHub$ProducerFailed as error if the client disconnects.
// recoverWithRetries -1 is essentially "recoverWith"
val source = MergeHub.source[AllowedWSMessage]
.log("source")
.recoverWithRetries(-1, { case _: Exception â Source.empty})
val sink: Sink[AllowedWSMessage, Source[AllowedWSMessage, NotUsed]] = BroadcastHub.sink[AllowedWSMessage]
source.via(serverSource).toMat(sink)(Keep.both).run()
}
(注意
source.via(serverSource)...
)
(Client -> WebSocket ->) MergeHub ~> myActor ~> BroadcastHub (-> WebSocket -> Client)
现在我想知道,做这件事的优雅方式是什么?