代码之家  ›  专栏  ›  技术社区  ›  SIM

如何在线程池中延迟执行?

  •  1
  • SIM  · 技术社区  · 7 年前

    我用python编写了一个脚本 multiprocessing.pool.ThreadPool 同时处理多个请求,并执行可靠的抓取过程。解析器正在完美地完成它的工作。

    正如我在几个脚本中注意到的,当使用 多重处理 ,我也希望在下面的脚本中延迟。

    然而,这正是我陷入困境的地方,我找不到合适的位置来拖延时间。

    这是迄今为止我的剧本:

    import requests
    from urllib.parse import urljoin
    from bs4 import BeautifulSoup
    from multiprocessing.pool import ThreadPool
    
    url = "http://srar.com/roster/index.php?agent_search=a"
    
    def get_links(link):
        completelinks = []
        res = requests.get(link)  
        soup = BeautifulSoup(res.text,'lxml')
        for items in soup.select("table.border tr"):
            if not items.select("td a[href^='index.php?agent']"):continue
            data = [urljoin(link,item.get("href")) for item in items.select("td a[href^='index.php?agent']")]
            completelinks.extend(data)
        return completelinks
    
    def get_info(nlink):
        req = requests.get(nlink)
        sauce = BeautifulSoup(req.text,"lxml")
        for tr in sauce.select("table[style$='1px;'] tr")[1:]:
            table = [td.get_text(strip=True) for td in tr.select("td")]
            print(table)
    
    if __name__ == '__main__':
        ThreadPool(20).map(get_info, get_links(url))
    

    再一次:我只需要知道我的脚本中的正确位置来延迟。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Darkonaut    7 年前

    在你的倍数之间延迟 requests.get() 呼叫,位于 get_info ,你必须扩大 获得信息 带有延迟参数,它可以作为 time.sleep() 打电话。由于所有工作线程都是一次性启动的,因此每次调用都必须累积延迟。意思是,当你想在 请求。 调用的时间为0.5秒,传递到pool方法的延迟列表如下所示[0.0、0.5、1.0、1.5、2.0、2.5…]。

    因为不需要改变 获得信息 本身,我在下面的示例中使用了一个decorator来扩展 获得信息 带有延迟参数和 time.sleep(delay) 打电话。请注意,我将把延迟时间与 获得信息 pool.starmap 打电话。

    import logging
    from multiprocessing.pool import ThreadPool
    from functools import wraps
    
    def delayed(func):
        @wraps(func)
        def wrapper(delay, *args, **kwargs):
            time.sleep(delay)  # <--
            return func(*args, **kwargs)
        return wrapper
    
    @delayed
    def get_info(nlink):
        info = nlink + '_info'
        logger.info(msg=info)
        return info
    
    
    def get_links(n):
        return [f'link{i}' for i in range(n)]
    
    
    def init_logging(level=logging.DEBUG):
        fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
              ' %(funcName)s()] --- %(message)s'
        logging.basicConfig(format=fmt, level=level)
    
    
    if __name__ == '__main__':
    
        DELAY = 0.5
    
        init_logging()
        logger = logging.getLogger(__name__)
    
        links = get_links(10) # ['link0', 'link1', 'link2', ...]
        delays = (x * DELAY for x in range(0, len(links)))
        arguments = zip(delays, links) # (0.0, 'link0'), (0.5, 'link1'), ...
    
        with ThreadPool(10) as pool:
            result = pool.starmap(get_info, arguments)
            print(result)
    

    实例输出:

    [2018-10-03 22:04:14,221 INFO     Thread-8 get_info()] --- link0_info
    [2018-10-03 22:04:14,721 INFO     Thread-5 get_info()] --- link1_info
    [2018-10-03 22:04:15,221 INFO     Thread-3 get_info()] --- link2_info
    [2018-10-03 22:04:15,722 INFO     Thread-4 get_info()] --- link3_info
    [2018-10-03 22:04:16,223 INFO     Thread-1 get_info()] --- link4_info
    [2018-10-03 22:04:16,723 INFO     Thread-6 get_info()] --- link5_info
    [2018-10-03 22:04:17,224 INFO     Thread-7 get_info()] --- link6_info
    [2018-10-03 22:04:17,723 INFO     Thread-10 get_info()] --- link7_info
    [2018-10-03 22:04:18,225 INFO     Thread-9 get_info()] --- link8_info
    [2018-10-03 22:04:18,722 INFO     Thread-2 get_info()] --- link9_info
    ['link0_info', 'link1_info', 'link2_info', 'link3_info', 'link4_info', 
    'link5_info', 'link6_info', 'link7_info', 'link8_info', 'link9_info']