代码之家  ›  专栏  ›  技术社区  ›  Rene2322

在多处理或多线程应用程序中保留cpu时间

  •  2
  • Rene2322  · 技术社区  · 7 年前

    我正在进行一个项目,使用树莓Pi 3进行一些环境控制,在一个连续循环中有许多简单的重复事件。RP3对于这份工作来说太过胜任了,但它让我更专注于其他事情。

    应用程序的特点:

    1. 应用程序应从十几个传感器(温度、湿度、pH值、ORP等)收集传感器数据(可变间隔n秒)。
    2. 控制器根据时间和这些传感器数据计算输出(开关、阀门和PWM驱动器)。
    3. 几乎没有事件需要按顺序运行。
    4. 某些事件属于“安全”类,触发时应立即运行(故障安全传感器、紧急按钮)。
    5. 大多数事件以秒为间隔重复运行(每秒,直到每30秒)。
    6. 某些事件会触发动作,在1到120秒内激活继电器。
    7. 有些事件使用基于时间的值。该值需要一天计算几次,并且相当占用CPU(使用一些间接的插值公式,因此具有可变的运行时间)。
    8. 显示环境状态(连续循环)

    我对VB很熟悉(不是专业)。NET,但决定在Python 3.6中执行此项目。 在过去的几个月里,我读了很多关于设计模式、线程、进程、事件、并行处理等主题的书。

    根据我的阅读,我认为Asyncio与Executor中的一些任务相结合可以完成这项工作。

    大多数任务/事件都不是时间关键型的。控制器输出可以使用“最新”传感器数据。 另一方面,有些任务在一定时间内激活继电器。我想知道如何对这些任务进行编程,而不让另一个“耗时”任务在一段时间内阻塞处理器(例如,CO2阀打开)。这对我的环境可能是灾难性的。

    因此,我需要一些建议。

    请参阅下面的代码。我不确定在Python中是否正确使用了Asyncio函数。 为了可读性,我将把各种任务的内容存储在单独的模块中。

    import asyncio
    import concurrent.futures
    import datetime
    import time
    import random
    import math
    
    # define a task...
    async def firstTask():
        while True:
            await asyncio.sleep(1)
            print("First task executed")
    
    # define another task...        
    async def secondTask():
        while True:
            await asyncio.sleep(5)
            print("Second Worker Executed")
    
    # define/simulate heavy CPU-bound task
    def heavy_load():
        while True:
            print('Heavy_load started')
            i = 0
            for i in range(50000000):
                f = math.sqrt(i)*math.sqrt(i)
            print('Heavy_load finished')
            time.sleep(4)
    
    def main():
    
        # Create a process pool (for CPU bound tasks).
        processpool = concurrent.futures.ProcessPoolExecutor()
    
        #  Create a thread pool (for I/O bound tasks).
        threadpool = concurrent.futures.ThreadPoolExecutor()
    
        loop = asyncio.get_event_loop()
    
        try:
            # Add all tasks. (Correct use?)
            asyncio.ensure_future(firstTask())
            asyncio.ensure_future(secondTask())
            loop.run_in_executor(processpool, heavy_load)
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            print("Loop will be ended")
            loop.close()    
    
    if __name__ == '__main__':
        main()
    
    2 回复  |  直到 7 年前
        1
  •  1
  •   user4815162342    7 年前

    大多数任务/事件都不是时间关键型的。控制器输出可以使用“最新”传感器数据。另一方面,有些任务在一定时间内激活继电器。我想知道如何对这些任务进行编程,而不让另一个“耗时”任务在一段时间内阻塞处理器(例如,CO2阀打开)。这对我的环境可能是灾难性的。

    请允许我强调,Python是 不是实时语言 ,异步IO不是实时组件。它们既不支持实时执行的基础设施(Python是垃圾收集的,通常在分时系统上运行),也没有在这样的环境中进行实际测试。因此,我强烈建议不要在任何可能出现失误的情况下使用它们 对您的环境造成灾难性影响 .

    这样一来,您的代码就有了一个问题:当 heavy_load 计算不会阻塞事件循环,也不会完成,也不会提供有关其进度的信息。背后的想法 run_in_executor 您正在运行的计算最终将停止,并且事件循环希望得到有关它的通知。的惯用用法 run\u in\u执行器 可能是这样的:

    def do_heavy_calc(param):
        print('Heavy_load started')
        f = 0
        for i in range(50000000):
            f += math.sqrt(i)*math.sqrt(i)
        return f
    
    def heavy_calc(param):
        loop = asyncio.get_event_loop()
        return loop.run_in_executor(processpool, do_heavy_calc)
    

    表达式 heavy_calc(...) 不仅在不阻塞事件循环的情况下运行,而且 等待中的 . 这意味着异步代码可以等待其结果,也不会阻塞其他协同路由:

    async def sum_params(p1, p2):
        s1 = await heavy_calc(p1)
        s2 = await heavy_calc(p2)
        return s1 + s2
    

    上面一个接一个地运行两个计算。也可以并行进行:

    async def sum_params_parallel(p1, p2):
        s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
        return s1 + s2
    

    另一个可以改进的是设置代码:

    asyncio.ensure_future(firstTask())
    asyncio.ensure_future(secondTask())
    loop.run_in_executor(processpool, heavy_load)
    loop.run_forever()
    

    使命感 asyncio.ensure_future 然后,从不等待结果有点像asyncio反模式。未等待的任务引发的异常会被默默地吞噬,这几乎肯定不是您想要的。有时人们只是忘记了写作 await ,这就是asyncio在循环被破坏时抱怨未等待的挂起任务的原因。

    很好的编码实践是安排等待的每个任务 某人 ,或立即使用 等候 gather 将其与其他任务相结合,或稍后再进行。例如,如果任务需要在后台运行,可以将其存储在某个位置 等候 或者在应用程序生命周期结束时取消它。如果是你,我会结合 聚集 具有 loop.run_until_complete :

    everything = asyncio.gather(firstTask(), secondTask(),
                               loop.run_in_executor(processpool, heavy_load))
    loop.run_until_complete(everything)
    
        2
  •  0
  •   bazza    7 年前

    某些事件属于“安全”类,触发时应立即运行(故障安全传感器、紧急按钮)。

    那么,我强烈建议您不要依赖软件来实现此功能。切断电源的紧急停止按钮通常是这样做的。如果你有这样的软件,并且你真的要处理一个威胁生命的情况,那么你将面临一大堆麻烦——几乎可以肯定,你必须遵守大量的法规。