代码之家  ›  专栏  ›  技术社区  ›  buhtz

在启动所有任务之前恢复异步任务

  •  0
  • buhtz  · 技术社区  · 7 年前

    在这里的示例代码中,首先启动所有异步IO任务。之后,如果IO操作完成,任务将继续。

    输出如下所示,您可以在其中看到6条结果消息 前6条开始消息。

    -- Starting https://jamanetwork.com/rss/site_3/67.xml...
    -- Starting https://www.b-i-t-online.de/bitrss.xml...
    -- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
    -- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
    -- Starting https://jamanetwork.com/rss/site_3/67.xml...
    -- Starting https://www.b-i-t-online.de/bitrss.xml...
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    1938204 size for https://www.b-i-t-online.de/bitrss.xml
    1938204 size for https://www.b-i-t-online.de/bitrss.xml
    38697 size for https://jamanetwork.com/rss/site_3/67.xml
    38697 size for https://jamanetwork.com/rss/site_3/67.xml
    FINISHED with 6 results from 6 tasks.
    

    但在我的情况下,我所期望的,以及能加速这件事的,是这样的

    -- Starting https://jamanetwork.com/rss/site_3/67.xml...
    -- Starting https://www.b-i-t-online.de/bitrss.xml...
    -- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
    1938204 size for https://www.b-i-t-online.de/bitrss.xml
    -- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    -- Starting https://jamanetwork.com/rss/site_3/67.xml...
    38697 size for https://jamanetwork.com/rss/site_3/67.xml
    -- Starting https://www.b-i-t-online.de/bitrss.xml...
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
    1938204 size for https://www.b-i-t-online.de/bitrss.xml
    38697 size for https://jamanetwork.com/rss/site_3/67.xml
    FINISHED with 6 results from 6 tasks.
    

    在我的真实代码中,我有数百个这样的下载任务。通常情况下,一些下载在所有下载开始之前就完成了。

    asyncio

    下面是一个简单的工作示例:

    #!/usr/bin/env python3
    import random
    import urllib.request
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    executor = ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    urls = ['https://www.b-i-t-online.de/bitrss.xml',
            'https://jamanetwork.com/rss/site_3/67.xml',
            'http://twitrss.me/twitter_user_to_rss/?user=cochranecollab']
    
    async def parse_one_url(u):
        print('-- Starting {}...'.format(u))
        r = await loop.run_in_executor(executor,
                                       urllib.request.urlopen, u)
        r = '{} size for {}'.format(len(r.read()), u)
        print(r)
    
    async def do_async_parsing():
        tasks = [
            parse_one_url(u)
            for u in urls
                ]
    
        completed, pending = await asyncio.wait(tasks)
        results = [task.result() for task in completed]
    
        print('FINISHED with {} results from {} tasks.'
              .format(len(results), len(tasks)))
    
    if __name__ == '__main__':
        # blow up the urls
        urls = urls * 2
        random.shuffle(urls)
        try:
            #loop.set_debug(True)
            loop.run_until_complete(do_async_parsing())
        finally:
            loop.close()
    

    附带问题 :不是吗 对我来说没用?只使用多线程不是更容易吗?

    0 回复  |  直到 7 年前
        1
  •  1
  •   user4815162342    7 年前

    在我的真实代码中,我有数百个这样的下载任务。通常情况下,一些下载在所有下载开始之前就完成了。

    asyncio.wait . 刚开始执行一个协同程序几乎是免费的,所以这一部分没有理由受到任何限制。但是,实际提交给的任务 ThreadPoolExecutor 上限为池中的工作线程数,默认值为CPU数的5倍,但可配置。如果URL的数量超过工作进程的数量,则应该获得所需的行为。(但要实际观察它,您需要将日志打印移动到由执行器管理的函数中。)

    请注意,同步调用 r.read() 还必须驻留在执行器运行的函数内,否则它将阻塞整个事件循环。代码的更正部分如下所示:

    def urlopen(u):
        print('-- Starting {}...'.format(u))
        r = urllib.request.urlopen(u)  # blocking call
        content = r.read()             # another blocking call
        print('{} size for {}'.format(len(content), u))
    
    async def parse_one_url(u):
        await loop.run_in_executor(executor, urlopen, u)
    

    aiohttp . 然后,您将获得asyncio的好处,例如工作取消和可扩展到大量任务。在这种设置中,您可以通过将检索简单地包装到 asyncio.Semaphore .

    concurrent.futures 同步功能如下 wait() as_completed 等待他们完成。