|
|
1
Chris Brian
2 年前
根据
FastAPI's documentation
:
当您用normal声明路径操作函数时
def
相反
属于
async def
,它在外部线程池中运行
那就是
await
ed
,而不是直接调用(因为它会阻止
服务器)。
同样,如前所述
here
:
如果您使用的是与
(数据库、API、文件系统等),并且没有
支持使用
等候
,(目前大多数人都是这样
数据库库),然后将路径操作函数声明为
正常情况下
def
。
如果您的应用程序(不知何故)不必与
任何其他内容并等待其响应,使用
异步def
。
如果你只是不知道,使用正常
def
。
笔记
:你可以混合
def
和
异步def
在您的路径操作中,根据您的需要,并使用最佳
选项。FastAPI会用它们做正确的事情。
无论如何,在上述任何一种情况下,FastAPI
仍然有效
异步
并且速度极快。
但是,按照上面的步骤,它将能够做一些
性能优化。
因此
def
端点(在异步编程的上下文中,用
def
被调用
同步的
函数)在外部线程池(即
等候
因此,FastAPI仍然可以工作
异步
),或者换句话说,服务器处理请求
同时
鉴于
异步def
在中运行的端点
event loop
在主(单个)线程上,即服务器处理请求
按顺序
,
只要没有
await
调用(通常)此类端点/路由内的非阻塞I/O绑定操作,例如
等待
用于(1)通过网络发送来自客户端的数据,(2)要读取的磁盘中文件的内容,(3)要完成的数据库操作等
here
),在这种情况下,服务器将处理请求
同时
/
异步
。
笔记
同样的概念不仅适用于FastAPI端点,而且适用于
StreamingResponse
's generator function
(参见
StreamingResponse
类实现),以及
Background Tasks
(参见
BackgroundTask
课堂实施);因此,在阅读完这个答案之后,您应该能够决定是否应该定义一个FastAPI端点,
流式处理响应
的生成器,或具有的后台任务函数
def
或
异步def
。
关键字
等候
(仅在
异步def
函数)将函数控制传递回
事件循环
。换句话说,它暂停了对周围环境的执行
coroutine
(即,协程对象是调用
异步def
函数),并且告诉
事件循环
让其他东西运行,直到
等候
ed任务完成。
笔记
这只是因为您可以使用定义自定义函数
异步def
然后
等候
如果该自定义函数包含对
time.sleep()
、CPU绑定任务、非异步I/O库或任何其他与异步Python代码不兼容的阻塞调用。例如,在FastAPI中,当使用
async
方法
UploadFile
例如
await file.read()
和
await file.write()
,FastAPI/Starlette,在幕后,实际上运行
methods of File objects
在外部线程池中(使用
异步
run_in_threadpool()
函数)和
等候
是的;否则,此类方法/操作将阻止
事件循环
。您可以通过查看
implementation of the
UploadFile
class
。
异步代码
async
and
await
is many times summarised as using coroutines
。
推论
是合作的(或
cooperatively multitasked
),意味着“在任何给定的时间,一个带有协程的程序只运行它的一个协程,而这个正在运行的协程只有在它明确请求挂起时才会挂起它的执行”(请参阅
here
和
here
了解更多关于协同程序的信息)。如中所述
this article
:
具体来说,无论何时执行当前运行的协同程序
达到
等候
表达式,则协同程序可能被挂起,并且
如果
已在上挂起,此后返回了一个值。悬架也可以
当
async for
块从
异步迭代器,或者当
async with
块已输入或
已退出,因为这些操作使用
等候
在引擎盖下面。
但是,如果在
异步def
函数/端点
堵塞主螺纹
(即
事件循环
)。因此,诸如
time.sleep()
在
异步def
端点将阻塞整个服务器(如您的问题中提供的示例)。因此,如果您的端点不会
异步
电话,您只需
def
相反,它将在外部线程池中运行
等候
ed,如前所述(更多解决方案将在以下章节中给出)。示例:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
否则,如果必须在端点内部执行的函数
异步
您必须执行的功能
等候
,您应该使用定义端点
异步def
。为了演示这一点,下面的示例使用
asyncio.sleep()
函数(来自
asyncio
库),其提供非阻塞睡眠操作。这个
await asyncio.sleep()
方法将暂停周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。给出了类似的例子
here
和
here
也
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
二者都
如果两个请求大约在同一时间到达,则上面的路径操作功能将按照问题中提到的相同顺序将指定的消息打印到屏幕上,即:
Hello
Hello
bye
bye
重要提示
当您第二次(第三次,依此类推)调用端点时,请记住从
与浏览器主会话隔离的选项卡
;否则,后续请求(即第一个请求之后的请求)将被浏览器阻止(在
客户端
),因为在发送下一个请求之前,浏览器将等待服务器对上一个请求的响应。您可以使用确认
print(request.client)
在端点内部,您可以在其中看到
hostname
和
port
所有传入请求的数量相同(如果请求是从同一浏览器窗口/会话中打开的选项卡启动的),因此,这些请求将按顺序处理,因为浏览器首先会按顺序发送这些请求。到
解决
您可以:
-
重新加载相同的选项卡(与正在运行的选项卡相同),或者
-
在隐身窗口中打开一个新选项卡,或者
-
使用其他浏览器/客户端发送请求,或者
-
使用
httpx
库到
make asynchronous HTTP requests
,以及
awaitable
asyncio.gather()
,它允许同时执行多个异步操作,然后在
相同的
将awaitables(tasks)传递给该函数的顺序(查看
this answer
了解更多详细信息)。
实例
:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
如果您必须调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在从服务器返回响应后立即在客户端打印出来,而不是等待
asyncio.gather()
要收集所有任务的结果并按任务传递给的相同顺序打印出来
send()
您可以替换
send()
上面示例的函数与下面显示的函数:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/
等候
以及阻止I/O绑定或CPU绑定操作
如果需要使用
异步def
(你可能需要
等候
对于端点内的协同程序),但也有一些
同步的
阻止I/O绑定或CPU绑定操作(长时间运行的计算任务)
事件循环
(本质上是整个服务器),并且不会让其他请求通过,例如:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
那么:
-
您应该检查是否可以将端点的定义更改为正常
def
而不是
异步def
例如,如果端点中唯一需要等待的方法是读取文件内容的方法(如您在下面的注释部分所述),则可以将端点参数的类型声明为
bytes
(即。,
file: bytes = File()
)因此,FastAPI将为您读取该文件,您将收到如下内容
字节
。因此,无需使用
等待文件.read()
。请注意,上述方法应适用于小文件,因为enitre文件内容将存储在内存中(请参阅
documentation on
File
Parameters
);因此,如果您的系统没有足够的RAM来容纳累积的数据(例如,如果您有8GB的RAM,则无法加载50GB的文件),则您的应用程序可能会崩溃。或者,您可以拨打
.read()
方法
SpooledTemporaryFile
直接(可以通过
.file
的属性
上传文件
对象),这样您就不必
等候
这个
.read()
方法,因为您现在可以用normal声明您的端点
def
,每个请求将在
独立螺纹
(示例如下)。有关如何上传
文件
,以及Starlette/FastAPI如何使用
后台文件临时文件
在幕后,请看一下
this answer
和
this answer
。
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
-
使用FastAPI(Starlette)
run_in_threadpool()
函数
concurrency
moduleas@tiangolo建议
here
“将在单独的线程中运行函数,以确保主线程(运行协同程序的地方)不会被阻塞”(请参阅
here
)。如@tiangolo所述
here
,”
run_in_threadpool
是一个不可重写的函数,第一个参数是一个正常函数,下一个参数直接传递给该函数。它同时支持序列参数和关键字参数”。
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
-
或者,使用
异步
的
loop.run_in_executor()
获得跑步后
事件循环
使用
asyncio.get_running_loop()
运行任务,在这种情况下,您可以
等候
以便它完成并返回结果,然后再转到下一行代码。通过
None
作为
遗嘱执行人
参数,将使用默认的执行器;就是
ThreadPoolExecutor
:
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
或者,如果你愿意
pass keyword arguments
相反,您可以使用
lambda
表达式(例如。,
lambda: cpu_bound_task(some_arg=contents)
),或者,优选地,
functools.partial()
,在文档中特别建议
loop.run_in_executor()
:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
您也可以在自定义中运行任务
线程池
。例如:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
在Python 3.9+中,您还可以使用
asyncio.to_thread()
在单独的线程中异步运行同步函数,该线程本质上使用
await loop.run_in_executor(None, func_call)
在引擎盖下,可以在
implementation of
asyncio.to_thread()
这个
to_thread()
函数采用要执行的阻塞函数的名称,以及该函数的任何参数(*args和/或**kwargs),然后返回一个可以
等候
ed.示例:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
-
线程池
将成功阻止
事件循环
阻止,但不会给你
性能改进
你对跑步的期望
并行代码
;尤其是在需要表演的时候
CPU-bound
操作,例如所描述的操作
here
(例如,音频或图像处理、机器学习等)。因此,最好
在单独的进程中运行CPU绑定的任务
使用
ProcessPoolExecutor
,如下所示。同样,您可以与
异步
为了
等候
它完成其工作并返回结果。如所述
here
在Windows上,保护代码的主循环以避免子流程的递归生成等非常重要。基本上,您的代码必须
if __name__ == '__main__':
。
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
-
使用
更多
workers
例如
uvicorn main:app --workers 4
(如果您正在使用
Gunicorn as a process manager with Uvicorn workers
,请看一下
this answer
)。
注:
每个工人
"has its own things, variables and memory"
。这意味着
global
变量/对象等不会在进程/工作者之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如所述
here
和
here
。此外,请注意
“如果您在代码中消耗了大量内存,
每个过程
将消耗相当数量的内存”
。
-
如果你需要表演
重背景计算
而且您不一定需要它由同一个进程运行(例如,您不需要共享内存、变量等),您可能会从使用其他更大的工具中受益
Celery
,如中所述
FastAPI's documentation
。
|