据我所知,(1)每次调用write时都需要调用drain。(2) 如果不是,我猜write将阻塞循环线程
两者都不正确,但混淆是可以理解的。路途
write()
-
打电话给
写()
只需将数据隐藏到缓冲区,让事件循环在以后实际将其写入,而无需程序进一步干预。就应用程序而言,数据在后台以对方能够接收到的速度写入。换句话说,每个
写()
drain()
.
-
从未
阻止事件循环。
写()
无论你需要在哪里,即使是从一个函数
async def
-但实际上这是一个重大的问题
属于
memory leak
在你的手上。
排水管()
修复了这个问题:如果写入缓冲区太大,等待它会暂停协同路由,并在完成后再次恢复
os.write()
在后台执行的操作成功,缓冲区缩小。
你不需要等待
之后
每一个
写,但您确实需要偶尔等待它,通常在循环的迭代之间
写()
被调用。例如:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
排水管()
如果挂起的未写入数据量很小,则立即返回。如果数据超过高阈值,
将挂起调用协同路由,直到挂起的未写入数据量降至低阈值以下。暂停将导致协同程序停止从中读取
peer1
,这将反过来导致对等方减慢向我们发送数据的速率。这种反馈称为背压。
缓冲应该在写函数内部处理,应用程序不应该在意。
差不多就是这样
现在就可以工作了——它可以处理缓冲,让应用程序不管是好是坏。也看到
this answer
更多信息。
针对问题的编辑部分:
再次阅读答案和链接,我认为函数是这样工作的。
写()
-应用程序必须做的唯一一件事就是让事件循环运行足够长的时间,以写出所有内容。
一个更正确的
write
drain
可能是这样的:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
最后一点是
另一个
打电话的好理由
-实际上注意到对方因为写信失败而消失了。