代码之家  ›  专栏  ›  技术社区  ›  Tristo

与其他线程或进程共享asyncio.Queue

  •  1
  • Tristo  · 技术社区  · 6 年前

    processing_frame ).

    这就是所谓的方法 analyze_frame )从共享列表中获取一个项 asyncio.Queue()

    import cv2
    import datetime
    import argparse
    import os
    import asyncio
    
    #   Making CLI
    if not os.path.exists("frames"):
        os.makedirs("frames")
    
    t0 = datetime.datetime.now()
    ap = argparse.ArgumentParser()
    ap.add_argument("-v", "--video", required=True,
                    help="path to our file")
    args = vars(ap.parse_args())
    
    threshold = .2
    death_count = 0
    was_found = False
    template = cv2.imread('youdied.png')
    vidcap = cv2.VideoCapture(args["video"])
    
    loop = asyncio.get_event_loop()
    frames_to_analyze = asyncio.Queue()
    
    
    def main():
        length = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
        tasks = []
        for _ in range(int(length / 50)):
            tasks.append(loop.create_task(read_frame(50, frames_to_analyze)))
            tasks.append(loop.create_task(analyze_frame(threshold, template, frames_to_analyze)))
        final_task = asyncio.gather(*tasks)
        loop.run_until_complete(final_task)
    
        dt = datetime.datetime.now() - t0
        print("App exiting, total time: {:,.2f} sec.".format(dt.total_seconds()))
    
        print(f"Deaths registered: {death_count}")
    
    
    async def read_frame(frames, frames_to_analyze):
        global vidcap
        for _ in range(frames-1):
            vidcap.grab()
    
        else:
            current_frame = vidcap.read()[1]
        print("Read 50 frames")
        await frames_to_analyze.put(current_frame)
    
    
    async def analyze_frame(threshold, template, frames_to_analyze):
        global vidcap
        global was_found
        global death_count
        frame = await frames_to_analyze.get()
        is_found = processing_frame(frame)
        if was_found and not is_found:
            death_count += 1
            await writing_to_file(death_count, frame)
        was_found = is_found
    
    
    def processing_frame(frame):
        res = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
        max_val = cv2.minMaxLoc(res)[1]
        is_found = max_val >= threshold
        print(is_found)
        return is_found
    
    
    async def writing_to_file(death_count, frame):
        cv2.imwrite(f"frames/frame{death_count}.jpg", frame)
    
    if __name__ == '__main__':
        main()
    

    我试过使用 unsync 但没有多大成功
    我会得到一些类似的东西

    使用self.\r锁定:
    PermissionError:[WinError 5]访问被拒绝

    1 回复  |  直到 6 年前
        1
  •  1
  •   user4815162342    6 年前

    如果 processing_frame 是一个阻塞函数,您应该使用 await loop.run_in_executor(None, processing_frame, frame) . 这将把函数提交到线程池,并允许事件循环继续执行其他操作,直到调用函数完成。

    这同样适用于诸如 cv2.imwrite writing_to_file async def . 这是因为它不等待任何东西,所以一旦开始执行,它将继续执行到最后,而不会暂停。在这种情况下,我们可以首先把它作为一个正常的函数,使它清楚地显示正在发生的事情。