代码之家  ›  专栏  ›  技术社区  ›  balki

为什么要显式调用asyncio.StreamWriter.drain?

  •  5
  • balki  · 技术社区  · 7 年前

    doc :

    Write data to the stream.
    
    This method is not subject to flow control. Calls to write() should be followed by drain().
    

    共程排水管()

    Wait until it is appropriate to resume writing to the stream. Example:
    
    writer.write(data)
    await writer.drain()
    

    据我了解,,

    • 你需要打电话 drain 每一次 write
    • 如果不是,我想, 将阻止循环线程

    不用排水?我能想到两个案例

    1. close 立即
    2. 在消息完成之前,您必须缓冲一些数据。

    第一个是一个特例,我认为我们可以有一个不同的API。缓冲应该在写函数内部处理,应用程序不应该在意。


    让我换一种说法。这样做的缺点是什么?python3.8版本有效地做到了这一点吗?

    async def awrite(writer, data):
        writer.write(data)
        await writer.drain()
    

    注: doc明确规定如下:

    当没有什么可以等待的时候 drain()


    再次阅读答案和链接,我认为函数是这样工作的。 :检查接受的答案以获得更准确的版本。

    def write(data):
        remaining = socket.try_write(data)
        if remaining:
            _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
    
    async def drain():
        if len(_pendingbuffer) < BUF_LIMIT:
            return
        await wait_until_other_side_is_up_to_speed()
        assert len(_pendingbuffer) < BUF_LIMIT
    
    async def awrite(writer, data):
        writer.write(data)
        await writer.drain()        
    

    那么什么时候使用什么:

    1. awrite
    2. 将数据流传输到大量客户端时(例如,一些实时流或大型文件)。如果数据在每个连接的缓冲区中重复,它肯定会溢出RAM。在本例中,编写一个循环,在每次迭代和调用时获取一块数据 . 如果文件很大, loop.sendfile 如果有更好的。
    1 回复  |  直到 5 年前
        1
  •  17
  •   user4815162342    5 年前

    据我所知,(1)每次调用write时都需要调用drain。(2) 如果不是,我猜write将阻塞循环线程

    两者都不正确,但混淆是可以理解的。路途 write()

    • 打电话给 写() 只需将数据隐藏到缓冲区,让事件循环在以后实际将其写入,而无需程序进一步干预。就应用程序而言,数据在后台以对方能够接收到的速度写入。换句话说,每个 写() drain() .

    • 从未 阻止事件循环。

    写() 无论你需要在哪里,即使是从一个函数 async def -但实际上这是一个重大的问题 属于 memory leak 在你的手上。 排水管() 修复了这个问题:如果写入缓冲区太大,等待它会暂停协同路由,并在完成后再次恢复 os.write() 在后台执行的操作成功,缓冲区缩小。

    你不需要等待 之后 每一个 写,但您确实需要偶尔等待它,通常在循环的迭代之间 写() 被调用。例如:

    while True:
        response = await peer1.readline()
        peer2.write(b'<response>')
        peer2.write(response)
        peer2.write(b'</response>')
        await peer2.drain()
    

    排水管() 如果挂起的未写入数据量很小,则立即返回。如果数据超过高阈值, 将挂起调用协同路由,直到挂起的未写入数据量降至低阈值以下。暂停将导致协同程序停止从中读取 peer1 ,这将反过来导致对等方减慢向我们发送数据的速率。这种反馈称为背压。

    缓冲应该在写函数内部处理,应用程序不应该在意。

    差不多就是这样 现在就可以工作了——它可以处理缓冲,让应用程序不管是好是坏。也看到 this answer 更多信息。


    针对问题的编辑部分:

    再次阅读答案和链接,我认为函数是这样工作的。

    写() -应用程序必须做的唯一一件事就是让事件循环运行足够长的时间,以写出所有内容。

    一个更正确的 write drain 可能是这样的:

    class ToyWriter:
        def __init__(self):
            self._buf = bytearray()
            self._empty = asyncio.Event(True)
    
        def write(self, data):
            self._buf.extend(data)
            loop.add_writer(self._fd, self._do_write)
            self._empty.clear()
    
        def _do_write(self):
            # Automatically invoked by the event loop when the
            # file descriptor is writable, regardless of whether
            # anyone calls drain()
            while self._buf:
                try:
                    nwritten = os.write(self._fd, self._buf)
                except OSError as e:
                    if e.errno == errno.EWOULDBLOCK:
                        return  # continue once we're writable again
                    raise
                self._buf = self._buf[nwritten:]
            self._empty.set()
            loop.remove_writer(self._fd, self._do_write)
    
        async def drain(self):
            if len(self._buf) > 64*1024:
                await self._empty.wait()
    

    最后一点是 另一个 打电话的好理由 -实际上注意到对方因为写信失败而消失了。