代码之家  ›  专栏  ›  技术社区  ›  zemarkhos

在python中使用双循环多处理

  •  -1
  • zemarkhos  · 技术社区  · 3 年前

    因此,我有一段代码,可以生成4个selenium chrome驱动程序,并从网页上的一个元素中提取数据。代码可以简化为以下内容:

    import json
    import multiprocessing as mp
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    
    class scraper():
        def __init__(self,list_of_urls, process_num):
            self.urls = list_of_urls
            self.process_num = process_num
    
        def scrape_urls(self):
            driver = webdriver.Chrome(driver_dir)
            data = []
            for url in self.urls:
                driver.get(url)
                element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
                data.append(element.text)
                print("Scraper # ", self.process_num," got data from: ",url)
            return data
    
    if __name__ == '__main__':
        with open('array_of_urls', 'r') as infile:
            urls = json.load(infile)
            number_of_processes=4
            length_of_urls = len(urls)
            partition_into = math.ceil(length_of_urls/number_of_processes)
            scrapers = []
            start = 0
            end = start + partition_into
            for num in range(number_of_processes):
                new_scraper = scraper(urls[start:end],num)
                scrapers.append(new_scraper)
                start = end
                end = start + partition_into
                if end > length_of_urls:
                    end = length_of_urls-1
    
            with mp.Pool(processes=number_of_processes) as pool:
                result_array = []
                for num in range(number_of_processes):
                    result_array.append(pool.apply_async(scrapers[num].scrape_urls))
                pool.close()
                pool.join()
    

    我遇到的问题是,5-10分钟后,其中一个刮板就会停止,唯一能唤醒它的是手动刷新浏览器上的页面。如果我离开它一个小时左右,四个中的三个停下来,只有一个在运行。它们不会出错或打印任何内容,只是停止运行。我在两台不同的笔记本电脑上试用过,它们都有相同的问题。我也试过用4个不同的议员来做这件事。Process()运行scrape_url,这也会做同样的事情。有没有其他人遇到过这个问题,或者我做错了什么?

    0 回复  |  直到 3 年前
        1
  •  2
  •   Booboo    3 年前

    首先,Selenium已经在创建一个进程,因此使用多线程而不是多处理要好得多,因为每个线程都将启动一个进程。还有,在 scrape_urls 在你的 driver = webdriver.Chrome(driver_dir) 语句,函数的其余部分应包含在 试试/最后 声明 最后 块包含 driver.quit() 以确保无论是否存在异常,驱动程序进程都会终止。现在,所有驱动程序进程都在运行。

    您还可以考虑使用以下技术来创建大小为4的线程池(或者取决于要处理多少URL),但是池中的每个线程自动重用已分配给线程的驱动程序,该线程保存在线程本地存储中。您可能希望更改用于创建驱动程序的选项(当前为“无头”模式):

    import json
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from multiprocessing.pool import ThreadPool
    import threading
    import gc
    
    
    threadLocal = threading.local()
    
    class Driver:
        def __init__(self):
            options = webdriver.ChromeOptions()
            options.add_argument("--headless")
            options.add_experimental_option('excludeSwitches', ['enable-logging'])
            self.driver = webdriver.Chrome(options=options)
    
        def __del__(self):
            self.driver.quit() # clean up driver when we are cleaned up
            print('The driver has been "quitted".')
    
        @classmethod
        def create_driver(cls):
            the_driver = getattr(threadLocal, 'the_driver', None)
            if the_driver is None:
                print('Creating new driver.')
                the_driver = cls()
                threadLocal.the_driver = the_driver
            driver = the_driver.driver
            the_driver = None
            return driver
    
    def scraper(url):
        """
        This now scrapes a single URL.
        """
        driver = Driver.create_driver()
        driver.get(url)
        element = WebDriverWait(driver, timeout=7).until(lambda d: d.find_element(by=By.CLASS_NAME, value="InterestingData"))
        print("got data from: ", url)
        return element.text
    
    with open('array_of_urls', 'r') as infile:
        urls = json.load(infile)
    number_of_processes = min(4, len(urls))
    with ThreadPool(processes=number_of_processes) as pool:
        result_array = pool.map(scraper, urls)
    
        # Must ensure drivers are quitted before threads are destroyed:
        del threadLocal
        # This should ensure that the __del__ method is run on class Driver:
        gc.collect()
    
        pool.close()
        pool.join()