我心中有以下架构:
我需要的是:
-
WebSocket服务器和TCP客户端必须是容错的,因为它们与外部应用程序之间的连接断开不会导致其他部分断开。例如,如果TCP服务器1发生故障,则TCP客户端1应在不影响WebSocket服务器或TCP客户端2的情况下处理此问题。
-
WebSocket服务器和TCP客户端应该能够通过某种消息传递进行通信,因为它们中的每一个都应该包含自己的状态。我不知道这种消息传递是什么(队列、方法调用等)。
我对这个想法很满意
asyncio
并对协同程序如何在事件循环中运行有一些了解。我正在努力解决的问题是如何在WebSocket服务器和TCP客户端之间提供分离。在的大多数示例中
异步
,所有内容都捆绑在一起
异步
功能和
asyncio.gather
在末尾调用。对我来说,这是夫妻个人的责任,我不明白如何扩大这些
异步
解决(1)和(2)的示例。
对我有帮助的是一个简单的例子,WebSocket服务器向TCP客户端来回发送消息,两者都使用
异步
。WebSocket服务器应使用
websockets
package
。TCP客户端应使用
asyncio
streams
.
一些示例代码扩展自文档示例:
import asyncio
from websockets.server import serve
async def send_message_to_tcp_client(queue, message):
queue.put_nowait(message)
return queue.get()
async def receive_websocket_message(websocket, queue):
# This also should handle application state. For example,
# a message may transition a state machine to a different
# state which would then potentially trigger new messages
# to the TCP client(s).
async for message in websocket:
response = await send_message_to_tcp_client(queue, message)
await websocket.send(response)
async def websocket_server():
# I don't know if this queue passing as argument works
async with serve(receive_websocket_message, "localhost", 8765, queue=queue):
await asyncio.Future() # run forever
async def main():
# Create queue to pass messages back and forth
q = queue.Queue()
await asyncio.gather(
websocket_server(q),
tcp_client(q)
)
async def tcp_client(queue):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
# I really need this loop to also handle some state
# that represents state of the TCP server it is talking
# to.
while True:
message = await queue.get()
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
queue.put_nowait(data)
writer.close()
await writer.wait_closed()
asyncio.run(main())
这段代码没有展示的(假设它甚至是一个有效的解决方案)是容错。应该如何分别处理与外部客户端和服务器的WebSocket和TCP通信中的断开连接和错误?它还混淆了如何确保WebSocket和TCP客户端之间的同步。应该先开始什么?