代码之家  ›  专栏  ›  技术社区  ›  YounghunJo

关于使用asyncio和wait/async-def关键字进行python异步编程的问题

  •  2
  • YounghunJo  · 技术社区  · 4 周前

    我目前正在通过查看示例代码来研究Python中的同步编程和使用asyncio的异步编程之间的区别。在这样做的时候,我有一个问题。

    由于我目前的工作是一名机器学习工程师,我使用为机器学习服务的服务器逻辑的类比来编写示例代码。以下是我想描述的场景:

    首先,存在一个称为预处理的函数,该函数必须始终首先执行。这是因为它为后面的两个函数(predict和apply_business_logic)预处理输入数据。

    其次,预处理函数的输出将用作predict和apply_business_logic函数的输入。预测函数简单地生成根据机器学习模型计算的预测,而apply_business_logic函数应用任意的业务逻辑。关键是这两个函数相互独立,使得异步执行比同步执行更有效率。

    最后,该逻辑包括计算预测和apply_business_logic函数输出的加权平均值,以确定最终结果,并返回该结果。

    如果我们假设逻辑结构为如上所述的三个步骤,那么同步执行它是非常简单的。

    1.同步运行

    import time
    
    
    def preprocess(x: list[int]):
        """
        dummy preprocess
        """
        return x
    
    
    def predict(x: list[int]) -> float:
        time.sleep(3)
        pred = sum(x) / len(x)
        print(">> finish `predict` function!")
        return pred
    
    
    def apply_business_logic(x: list[int]) -> int:
        res = sum(x) ** 2
        print(">> finish `apply_business_logic` function!")
        return res
    
    
    def main(x: list[int]) -> float:
        # sync function because it must be done before running `predict` and `apply_business_logic` function
        x = preprocess(x)
    
        # asynchronously running(..?)
        pred_result = predict(x)
        logic_result = apply_business_logic(x)
    
        # final response
        alpha = 0.7
        response = (alpha * pred_result) + ((1 - alpha) * logic_result)
        return response
    
    if __name__ == "__main__":
        x = list(range(100))
    
        res = main(x)
        print('>> final result:', res)
    

    2.异步运行(1)

    我无法确保以下代码将异步运行。。

    import asyncio
    
    
    def preprocess(x: list[int]):
        """
        dummy preprocess
        """
        return x
    
    
    async def predict(x: list[int]) -> float:
        await asyncio.sleep(delay=3)
        pred = sum(x) / len(x)
        print(">> finish `predict` function!")
        return pred
    
    
    async def apply_business_logic(x: list[int]) -> int:
        res = sum(x) ** 2
        print(">> finish `apply_business_logic` function!")
        return res
    
    
    async def main(x: list[int]) -> float:
        # sync function because it must be done before running `predict` and `apply_business_logic` function
        x = preprocess(x)
    
        # asynchronously running(..?)
        pred_result = await predict(x)
        logic_result = await apply_business_logic(x)
    
        # final response
        alpha = 0.7
        response = (alpha * pred_result) + ((1 - alpha) * logic_result)
        return response
    
    if __name__ == "__main__":
        x = list(range(100))
    
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(main(x))
        loop.close()
        print('>> final result:', res)
    

    3.异步运行(2)

    这段代码必须异步运行,我检查了它。

    import asyncio
    
    def preprocess(x: list[int]):
        """
        dummy preprocess
        """
        return x
    
    
    async def predict(x: list[int]) -> float:
        await asyncio.sleep(delay=3)  
        pred = sum(x) / len(x)
        print(">> finish `predict` function!")
        return pred
    
    
    async def apply_business_logic(x: list[int]) -> int:
        res = sum(x) ** 2
        print(">> finish `apply_business_logic` function!")
        return res
    
    
    async def main(x: list[int]) -> float:
        # sync function because it must be done before running `predict` and `apply_business_logic` function
        x = preprocess(x)
    
        tasks = await asyncio.gather(
            predict(x),
            apply_business_logic(x)
        )
        pred_result, logic_result = tasks[0], tasks[1]
    
        # final response
        alpha = 0.7
        response = (alpha * pred_result) + ((1-alpha) * logic_result)
        return response
    
    if __name__ == "__main__":
        x = list(range(100))
        resp = asyncio.run(main(x))
        print("resp >>", resp)
    

    我不知道这两种方法之间的区别( 1.synchronous running 2.asynchronous running(1) ). 尤其是第二种方法( 2.异步运行(1) )是第一次打印 print(">> finish 预测 function!") 尽管使用异步语法( await, async def 关键字..)。所以,第二种方法不是异步的??

    1 回复  |  直到 4 周前
        1
  •  2
  •   AKX    4 周前

    回答我的意见: async 不会使同步Python函数自动异步; 异步 真的是一种合作多任务处理的形式。

    下面是一个简化的例子 f 执行异步工作(以 asyncio.sleep f2 进行同步工作(以 time.sleep ).

    import asyncio
    import time
    
    
    async def f(name: str) -> None:
        for x in range(3):
            print(f"{name} will sleep")
            await asyncio.sleep(0.5)
            print(f"{name} woke up!")
    
    
    async def f2(name: str) -> None:
        for x in range(3):
            print(f"{name} will sleep (2)")
            time.sleep(0.5)
            print(f"{name} woke up (2)!")
    
    
    async def main() -> float:
        await asyncio.gather(
            f("Hernekeitto"),
            f("Viina"),
            f("Teline"),
        )
        print("----")
        await asyncio.gather(
            f2("Johannes"),
            f2("Appelsiini"),
            f2("Kuutio"),
        )
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    这会打印出以下内容:

    Hernekeitto will sleep
    Viina will sleep
    Teline will sleep
    Hernekeitto woke up!
    Hernekeitto will sleep
    Viina woke up!
    Viina will sleep
    Teline woke up!
    Teline will sleep
    Hernekeitto woke up!
    Hernekeitto will sleep
    Viina woke up!
    Viina will sleep
    Teline woke up!
    Teline will sleep
    Hernekeitto woke up!
    Viina woke up!
    Teline woke up!
    ----
    Johannes will sleep (2)
    Johannes woke up (2)!
    Johannes will sleep (2)
    Johannes woke up (2)!
    Johannes will sleep (2)
    Johannes woke up (2)!
    Appelsiini will sleep (2)
    Appelsiini woke up (2)!
    Appelsiini will sleep (2)
    Appelsiini woke up (2)!
    Appelsiini will sleep (2)
    Appelsiini woke up (2)!
    Kuutio will sleep (2)
    Kuutio woke up (2)!
    Kuutio will sleep (2)
    Kuutio woke up (2)!
    Kuutio will sleep (2)
    Kuutio woke up (2)!
    

    正如您所看到的,在第一种情况下,三个异步函数调用能够并行地完成它们的工作(但总有一种模式 woke up! will sleep ,因为没有 await 在两个打印之间(当它们循环运行时)。

    在第二种情况下,函数是串行运行的,因为没有 等候 在里面 f2 这将允许其他异步协同程序运行。

    如评论中所述,您可以使用 loop.run_in_executor() 在中运行同步函数 concurrent.futures.Executor (通常为 ThreadPoolExecutor ),所以它在不同的线程(或进程)中运行,结果可能是 等候 预计起飞时间。