我正在尝试实现一个通过FastAPI提供服务的GraphQL应用程序。graphql查询模式混合了
async
和
sync
解析器,因为我正试图远离GraphQL2。尝试使用fastAPI、starlete-graphen3、石墨烯和graphql内核。当我尝试运行以下查询时:
query {{
thread(id: "123") {{
id
}}
}}
我会遇到一些奇怪的错误,比如:
TypeError: Cannot return null for non-nullable field Thread.id.
。此外,当远程调试应用程序时,调试器从不进入解析程序函数。如果我通过尝试将一些异步解析器切换到同步来进行测试,那么解析器似乎会出现故障,从而导致其他奇怪的错误。
class Query(graphene.ObjectType):
me = graphene.Field(User)
thread = graphene.Field(Thread, thread_id=graphene.String(required=True, name="id"))
...
@staticmethod
async def resolve_thread(parent, info: GrapheneInfo, thread_id):
return await info.context.thread_loader.load(thread_id)
@staticmethod
def resolve_some_other_field(parent, info: GrapheneInfo, field_id):
...
...
这个
Thread
对象
class Thread(graphene.ObjectType):
id = graphene.ID(required=True)
topic = graphene.String(required=True)
post = graphene.Field(lambda: Post, required=True)
@staticmethod
async def resolve_thread(thread: ThreadModel, info: GrapheneInfo, **kwargs):
return await info.context.post_loader.load(thread.post_id)
...
这个
ThreadLoader
班
from typing import Optional
from aiodataloader import DataLoader
...
class ThreadLoader(DataLoader[str, Optional[ThreadModel]]):
thread_service: ThreadService
def __init__(self, thread_service: ThreadService):
super(ThreadLoader, self).__init__()
self.thread_service = thread_service
async def batch_load_fn(self, keys: list[str]) -> list[Optional[ThreadModel]]:
# Thread service function is not async
threads = self.thread_service.get_threads(keys)
这个
graphqlApp
初始化:
def schema():
# Set up the schema to our root level queries and mutators
return Schema(
query=Query,
types=[],
)
...
def setup_graph_ql_app(get_context: Callable[[], Context]) -> GraphQLApp:
return GraphQLApp(
schema=schema(),
on_get=get_graphiql_handler(),
context_value=_get_context_callable(get_context),
middleware=[
...
],
)
我将此添加到我的fastAPI应用程序中,如下所示:
fast_api_app = FastAPI()
graphql_app: GraphQLApp = setup_graph_ql_app(get_context=context_callable)
app.add_route("/v1/graphql", graphql_app)
我定义的依赖关系是:
graphene = "^3.2"
starlette-graphene3 = "^0.6.0"
graphql-core = "^3.2.0"
fastapi = "^0.109.0"
uvloop = "^0.19.0"
asyncio = "^3.4.3"
aiodataloader = "^0.4.0"
查看此处的文档:
https://docs.graphene-python.org/_/downloads/sqlalchemy/en/latest/pdf/
和
https://docs.graphene-python.org/en/latest/execution/dataloader/#dataloader
似乎与上面的类相匹配。是我遗漏了什么,还是我做得不对?