代码之家  ›  专栏  ›  技术社区  ›  bmitc

将消息发送到“asyncio”WebSocket服务器和从其发送到“异步”TCP客户端

  •  0
  • bmitc  · 技术社区  · 1 年前

    我心中有以下架构:

    enter image description here

    我需要的是:

    1. WebSocket服务器和TCP客户端必须是容错的,因为它们与外部应用程序之间的连接断开不会导致其他部分断开。例如,如果TCP服务器1发生故障,则TCP客户端1应在不影响WebSocket服务器或TCP客户端2的情况下处理此问题。

    2. WebSocket服务器和TCP客户端应该能够通过某种消息传递进行通信,因为它们中的每一个都应该包含自己的状态。我不知道这种消息传递是什么(队列、方法调用等)。

    我对这个想法很满意 asyncio 并对协同程序如何在事件循环中运行有一些了解。我正在努力解决的问题是如何在WebSocket服务器和TCP客户端之间提供分离。在的大多数示例中 异步 ,所有内容都捆绑在一起 异步 功能和 asyncio.gather 在末尾调用。对我来说,这是夫妻个人的责任,我不明白如何扩大这些 异步 解决(1)和(2)的示例。


    对我有帮助的是一个简单的例子,WebSocket服务器向TCP客户端来回发送消息,两者都使用 异步 。WebSocket服务器应使用 websockets package 。TCP客户端应使用 asyncio streams .

    一些示例代码扩展自文档示例:

    import asyncio
    from websockets.server import serve
    
    async def send_message_to_tcp_client(queue, message):
        queue.put_nowait(message)
        return queue.get()
    
    async def receive_websocket_message(websocket, queue):
        # This also should handle application state. For example,
        # a message may transition a state machine to a different
        # state which would then potentially trigger new messages
        # to the TCP client(s).
        async for message in websocket:
            response = await send_message_to_tcp_client(queue, message)
            await websocket.send(response)
    
    async def websocket_server():
        # I don't know if this queue passing as argument works
        async with serve(receive_websocket_message, "localhost", 8765, queue=queue):
            await asyncio.Future()  # run forever
    
    async def main():
        # Create queue to pass messages back and forth
        q = queue.Queue()
    
        await asyncio.gather(
            websocket_server(q),
            tcp_client(q)
        )
    
    async def tcp_client(queue):
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8888)
    
        # I really need this loop to also handle some state
        # that represents state of the TCP server it is talking
        # to.
        while True:
            message = await queue.get()
            writer.write(message.encode())
            await writer.drain()
    
            data = await reader.read(100)
            queue.put_nowait(data)
    
        writer.close()
        await writer.wait_closed()
    
    asyncio.run(main())
    

    这段代码没有展示的(假设它甚至是一个有效的解决方案)是容错。应该如何分别处理与外部客户端和服务器的WebSocket和TCP通信中的断开连接和错误?它还混淆了如何确保WebSocket和TCP客户端之间的同步。应该先开始什么?

    0 回复  |  直到 1 年前