代码之家  ›  专栏  ›  技术社区  ›  krad

如何正确运行异步后台任务?

  •  0
  • krad  · 技术社区  · 1 年前

    我有以下代码:

    import time
    from fastapi import FastAPI, Request
        
    app = FastAPI()
        
    @app.get("/ping")
    async def ping(request: Request):
            print("Hello")
            time.sleep(5)
            print("bye")
            return {"ping": "pong!"}
    

    如果我在localhost上运行代码,例如。, http://localhost:8501/ping -在同一浏览器窗口的不同选项卡中,我得到:

    Hello
    bye
    Hello
    bye
    

    而不是:

    Hello
    Hello
    bye
    bye
    

    我读过关于使用 httpx ,但是,我仍然无法实现真正的并行化。怎么了?

    0 回复  |  直到 3 年前
        1
  •  69
  •   Chris Brian    2 年前

    根据 FastAPI's documentation :

    当您用normal声明路径操作函数时 def 相反 属于 async def ,它在外部线程池中运行 那就是 await ed ,而不是直接调用(因为它会阻止 服务器)。

    同样,如前所述 here :

    如果您使用的是与 (数据库、API、文件系统等),并且没有 支持使用 等候 ,(目前大多数人都是这样 数据库库),然后将路径操作函数声明为 正常情况下 def

    如果您的应用程序(不知何故)不必与 任何其他内容并等待其响应,使用 异步def

    如果你只是不知道,使用正常 def

    笔记 :你可以混合 def 异步def 在您的路径操作中,根据您的需要,并使用最佳 选项。FastAPI会用它们做正确的事情。

    无论如何,在上述任何一种情况下,FastAPI 仍然有效 异步 并且速度极快。

    但是,按照上面的步骤,它将能够做一些 性能优化。

    因此 def 端点(在异步编程的上下文中,用 def 被调用 同步的 函数)在外部线程池(即 等候 因此,FastAPI仍然可以工作 异步 ),或者换句话说,服务器处理请求 同时 鉴于 异步def 在中运行的端点 event loop 在主(单个)线程上,即服务器处理请求 按顺序 , 只要没有 await 调用(通常)此类端点/路由内的非阻塞I/O绑定操作,例如 等待 用于(1)通过网络发送来自客户端的数据,(2)要读取的磁盘中文件的内容,(3)要完成的数据库操作等 here ),在这种情况下,服务器将处理请求 同时 / 异步 笔记 同样的概念不仅适用于FastAPI端点,而且适用于 StreamingResponse 's generator function (参见 StreamingResponse 类实现),以及 Background Tasks (参见 BackgroundTask 课堂实施);因此,在阅读完这个答案之后,您应该能够决定是否应该定义一个FastAPI端点, 流式处理响应 的生成器,或具有的后台任务函数 def 异步def

    关键字 等候 (仅在 异步def 函数)将函数控制传递回 事件循环 。换句话说,它暂停了对周围环境的执行 coroutine (即,协程对象是调用 异步def 函数),并且告诉 事件循环 让其他东西运行,直到 等候 ed任务完成。 笔记 这只是因为您可以使用定义自定义函数 异步def 然后 等候 如果该自定义函数包含对 time.sleep() 、CPU绑定任务、非异步I/O库或任何其他与异步Python代码不兼容的阻塞调用。例如,在FastAPI中,当使用 async 方法 UploadFile 例如 await file.read() await file.write() ,FastAPI/Starlette,在幕后,实际上运行 methods of File objects 在外部线程池中(使用 异步 run_in_threadpool() 函数)和 等候 是的;否则,此类方法/操作将阻止 事件循环 。您可以通过查看 implementation of the UploadFile class

    异步代码 async and await is many times summarised as using coroutines 推论 是合作的(或 cooperatively multitasked ),意味着“在任何给定的时间,一个带有协程的程序只运行它的一个协程,而这个正在运行的协程只有在它明确请求挂起时才会挂起它的执行”(请参阅 here here 了解更多关于协同程序的信息)。如中所述 this article :

    具体来说,无论何时执行当前运行的协同程序 达到 等候 表达式,则协同程序可能被挂起,并且 如果 已在上挂起,此后返回了一个值。悬架也可以 当 async for 块从 异步迭代器,或者当 async with 块已输入或 已退出,因为这些操作使用 等候 在引擎盖下面。

    但是,如果在 异步def 函数/端点 堵塞主螺纹 (即 事件循环 )。因此,诸如 time.sleep() 异步def 端点将阻塞整个服务器(如您的问题中提供的示例)。因此,如果您的端点不会 异步 电话,您只需 def 相反,它将在外部线程池中运行 等候 ed,如前所述(更多解决方案将在以下章节中给出)。示例:

    @app.get("/ping")
    def ping(request: Request):
        #print(request.client)
        print("Hello")
        time.sleep(5)
        print("bye")
        return "pong"
    

    否则,如果必须在端点内部执行的函数 异步 您必须执行的功能 等候 ,您应该使用定义端点 异步def 。为了演示这一点,下面的示例使用 asyncio.sleep() 函数(来自 asyncio 库),其提供非阻塞睡眠操作。这个 await asyncio.sleep() 方法将暂停周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。给出了类似的例子 here here

    import asyncio
     
    @app.get("/ping")
    async def ping(request: Request):
        #print(request.client)
        print("Hello")
        await asyncio.sleep(5)
        print("bye")
        return "pong"
    

    二者都 如果两个请求大约在同一时间到达,则上面的路径操作功能将按照问题中提到的相同顺序将指定的消息打印到屏幕上,即:

    Hello
    Hello
    bye
    bye
    

    重要提示

    当您第二次(第三次,依此类推)调用端点时,请记住从 与浏览器主会话隔离的选项卡 ;否则,后续请求(即第一个请求之后的请求)将被浏览器阻止(在 客户端 ),因为在发送下一个请求之前,浏览器将等待服务器对上一个请求的响应。您可以使用确认 print(request.client) 在端点内部,您可以在其中看到 hostname port 所有传入请求的数量相同(如果请求是从同一浏览器窗口/会话中打开的选项卡启动的),因此,这些请求将按顺序处理,因为浏览器首先会按顺序发送这些请求。到 解决 您可以:

    1. 重新加载相同的选项卡(与正在运行的选项卡相同),或者

    2. 在隐身窗口中打开一个新选项卡,或者

    3. 使用其他浏览器/客户端发送请求,或者

    4. 使用 httpx 库到 make asynchronous HTTP requests ,以及 awaitable asyncio.gather() ,它允许同时执行多个异步操作,然后在 相同的 将awaitables(tasks)传递给该函数的顺序(查看 this answer 了解更多详细信息)。

      实例 :

      import httpx
      import asyncio
      
      URLS = ['http://127.0.0.1:8000/ping'] * 2
      
      async def send(url, client):
          return await client.get(url, timeout=10)
      
      async def main():
          async with httpx.AsyncClient() as client:
              tasks = [send(url, client) for url in URLS]
              responses = await asyncio.gather(*tasks)
              print(*[r.json() for r in responses], sep='\n')
      
      asyncio.run(main())
      

      如果您必须调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在从服务器返回响应后立即在客户端打印出来,而不是等待 asyncio.gather() 要收集所有任务的结果并按任务传递给的相同顺序打印出来 send() 您可以替换 send() 上面示例的函数与下面显示的函数:

      async def send(url, client):
          res = await client.get(url, timeout=10)
          print(res.json())
          return res
      

    Async / 等候 以及阻止I/O绑定或CPU绑定操作

    如果需要使用 异步def (你可能需要 等候 对于端点内的协同程序),但也有一些 同步的 阻止I/O绑定或CPU绑定操作(长时间运行的计算任务) 事件循环 (本质上是整个服务器),并且不会让其他请求通过,例如:

    @app.post("/ping")
    async def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = await file.read()
            res = cpu_bound_task(contents)  # this will block the event loop
        finally:
            await file.close()
        print("bye")
        return "pong"
    

    那么:

    1. 您应该检查是否可以将端点的定义更改为正常 def 而不是 异步def 例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的注释部分所述),则可以将端点参数的类型声明为 bytes (即。, file: bytes = File() )因此,FastAPI将为您读取该文件,您将收到如下内容 字节 。因此,无需使用 等待文件.read() 。请注意,上述方法应适用于小文件,因为enitre文件内容将存储在内存中(请参阅 documentation on File Parameters );因此,如果您的系统没有足够的RAM来容纳累积的数据(例如,如果您有8GB的RAM,则无法加载50GB的文件),则您的应用程序可能会崩溃。或者,您可以拨打 .read() 方法 SpooledTemporaryFile 直接(可以通过 .file 的属性 上传文件 对象),这样您就不必 等候 这个 .read() 方法,因为您现在可以用normal声明您的端点 def ,每个请求将在 独立螺纹 (示例如下)。有关如何上传 文件 ,以及Starlette/FastAPI如何使用 后台文件临时文件 在幕后,请看一下 this answer this answer

      @app.post("/ping")
      def ping(file: UploadFile = File(...)):
          print("Hello")
          try:
              contents = file.file.read()
              res = cpu_bound_task(contents)
          finally:
              file.file.close()
          print("bye")
          return "pong"
      
    2. 使用FastAPI(Starlette) run_in_threadpool() 函数 concurrency moduleas@tiangolo建议 here “将在单独的线程中运行函数,以确保主线程(运行协同程序的地方)不会被阻塞”(请参阅 here )。如@tiangolo所述 here ,” run_in_threadpool 是一个不可重写的函数,第一个参数是一个正常函数,下一个参数直接传递给该函数。它同时支持序列参数和关键字参数”。

      from fastapi.concurrency import run_in_threadpool
      
      res = await run_in_threadpool(cpu_bound_task, contents)
      
    3. 或者,使用 异步 loop.run_in_executor() 获得跑步后 事件循环 使用 asyncio.get_running_loop() 运行任务,在这种情况下,您可以 等候 以便它完成并返回结果,然后再转到下一行代码。通过 None 作为 遗嘱执行人 参数,将使用默认的执行器;就是 ThreadPoolExecutor :

      import asyncio
      
      loop = asyncio.get_running_loop()
      res = await loop.run_in_executor(None, cpu_bound_task, contents)
      

      或者,如果你愿意 pass keyword arguments 相反,您可以使用 lambda 表达式(例如。, lambda: cpu_bound_task(some_arg=contents) ),或者,优选地, functools.partial() ,在文档中特别建议 loop.run_in_executor() :

      import asyncio
      from functools import partial
      
      loop = asyncio.get_running_loop()
      res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
      

      您也可以在自定义中运行任务 线程池 。例如:

      import asyncio
      import concurrent.futures
      
      loop = asyncio.get_running_loop()
      with concurrent.futures.ThreadPoolExecutor() as pool:
          res = await loop.run_in_executor(pool, cpu_bound_task, contents)
      

      在Python 3.9+中,您还可以使用 asyncio.to_thread() 在单独的线程中异步运行同步函数,该线程本质上使用 await loop.run_in_executor(None, func_call) 在引擎盖下,可以在 implementation of asyncio.to_thread() 这个 to_thread() 函数采用要执行的阻塞函数的名称,以及该函数的任何参数(*args和/或**kwargs),然后返回一个可以 等候 ed.示例:

      import asyncio
      
      res = await asyncio.to_thread(cpu_bound_task, contents)
      
    4. 线程池 将成功阻止 事件循环 阻止,但不会给你 性能改进 你对跑步的期望 并行代码 ;尤其是在需要表演的时候 CPU-bound 操作,例如所描述的操作 here (例如,音频或图像处理、机器学习等)。因此,最好 在单独的进程中运行CPU绑定的任务 使用 ProcessPoolExecutor ,如下所示。同样,您可以与 异步 为了 等候 它完成其工作并返回结果。如所述 here 在Windows上,保护代码的主循环以避免子流程的递归生成等非常重要。基本上,您的代码必须 if __name__ == '__main__':

      import concurrent.futures
      
      loop = asyncio.get_running_loop()
      with concurrent.futures.ProcessPoolExecutor() as pool:
          res = await loop.run_in_executor(pool, cpu_bound_task, contents) 
      
    5. 使用 更多 workers 例如 uvicorn main:app --workers 4 (如果您正在使用 Gunicorn as a process manager with Uvicorn workers ,请看一下 this answer )。 注: 每个工人 "has its own things, variables and memory" 。这意味着 global 变量/对象等不会在进程/工作者之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如所述 here here 。此外,请注意 “如果您在代码中消耗了大量内存, 每个过程 将消耗相当数量的内存”

    6. 如果你需要表演 重背景计算 而且您不一定需要它由同一个进程运行(例如,您不需要共享内存、变量等),您可能会从使用其他更大的工具中受益 Celery ,如中所述 FastAPI's documentation

        2
  •  2
  •   halfer    3 年前

    问:
    “…怎么了?”

    A:
    FastAPI文档明确表示框架使用进程内任务(继承自 Starlette )。

    这本身就意味着,所有这些任务都会竞争(不时)接收Python解释器GIL锁,这实际上是一个威胁MUTEX的全局解释器锁- [SERIAL] -ises任意数量的Python解释器进程内线程
    作为 一个和- 只有一个作品 -而其他人都在等着 。。。

    在细粒度上,您可以看到结果——如果为第二个(从第二个FireFox选项卡手动启动)到达的http请求生成另一个处理程序所花费的时间实际上比睡眠所花费的要长,则GIL锁交错的结果 ~ 100 [ms] 时间循环 约100[ms] 在下一轮GIL锁释放之前,获取轮盘赌发生)Python解释器的内部工作没有显示更多细节,您可以使用更多细节(取决于O/S类型或版本) here 查看更多 in-thread LoD,在正在执行的异步修饰代码中是这样的:

    import time
    import threading
    from   fastapi import FastAPI, Request
    
    TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
    
    print( TEMPLATE.format( time.perf_counter_ns(),
                            threading.get_ident(),
                           "Python Interpreter __main__ was started ..."
                            )
    ...
    @app.get("/ping")
    async def ping( request: Request ):
            """                                __doc__
            [DOC-ME]
            ping( Request ):  a mock-up AS-IS function to yield
                              a CLI/GUI self-evidence of the order-of-execution
            RETURNS:          a JSON-alike decorated dict
    
            [TEST-ME]         ...
            """
            print( TEMPLATE.format( time.perf_counter_ns(),
                                    threading.get_ident(),
                                   "Hello..."
                                    )
            #------------------------------------------------- actual blocking work
            time.sleep( 5 )
            #------------------------------------------------- actual blocking work
            print( TEMPLATE.format( time.perf_counter_ns(),
                                    threading.get_ident(),
                                   "...bye"
                                    )
            return { "ping": "pong!" }
    

    最后,但同样重要的是,不要犹豫,阅读更多关于所有 other sharks 基于线程的代码可能会受到。。。甚至导致。。。在窗帘后面。。。

    广告备忘录

    混合了GIL锁、基于线程的池、异步装饰器、阻塞和事件处理——这无疑是不确定性的混合;HWY2HELL;o)