代码之家  ›  专栏  ›  技术社区  ›  Robus

python线程传递状态

  •  2
  • Robus  · 技术社区  · 15 年前

    基本上,我要做的是使用代理获取几个网站并处理数据。问题是,请求很少以令人信服的方式失败,设置套接字超时也没有太大帮助,因为它们通常不起作用。

    所以我所做的是:

    q = Queue()
    s = ['google.com','ebay.com',] # And so on
    for item in s:
        q.put(item)
    
    
    def worker():
            item = q.get()
            data = fetch(item) # This is the buggy part
            # Process the data, yadayada
    
    for i in range(workers):
        t = InterruptableThread(target=worker)
        t.start()
    
    
    # Somewhere else
    if WorkerHasLivedLongerThanTimeout:
        worker.terminate()
    

    (InterruptableThread class) 问题是,我只想杀死仍然卡在取件上的线程。另外,我希望项目返回队列。Ie:

    def worker():
            self.status = 0
            item = q.get()
            data = fetch(item) # This is the buggy part
            self.status = 1 # Don't kill me now, bro!
            # Process the data, yadayada
    
    # Somewhere else
    if WorkerHasLivedLongerThanTimeout and worker.status != 1:
        q.put(worker.item)
        worker.terminate()
    

    怎么能做到?

    1 回复  |  直到 15 年前
        1
  •  1
  •   intuited    15 年前

    编辑 :突发新闻;见下文·

    我最近决定做一些类似的事情,结果是 pqueue_fetcher 模块。它最终主要是一种学习努力:我学到了,除其他外,几乎可以肯定的是,使用像 twisted 而不是试图以任何可靠性杀死Python线程。

    也就是说,模块中有代码或多或少地回答了你的问题。它基本上由一个类组成,该类的对象可以被设置为从优先级队列中获取位置并将其放入 fetch 在对象实例化时提供的函数。如果在线程终止之前成功接收到位置的资源,则会将其转发到 results 排队;否则它们将返回到 locations 具有降级优先级的队列。成功由默认为 bool .

    一路上我创造了 terminable_thread 模块,它只是将我能找到的最成熟的代码变体打包为 InterruptableThread . 它还为64位机器添加了一个修复程序,我需要它来在我的Ubuntu设备上使用该代码。 terminable_thread 依赖于 pqueue_fetcher .

    可能我遇到的最大障碍是引发异步异常 可终止螺纹 可中断线程 你提到过,可能会有一些奇怪的结果。在测试套件中 取物器 , the 取来 通过调用的函数块 time.sleep . 我发现如果一根线 terminate() 当这样阻塞时, sleep 调用是嵌套try块中的最后一条(甚至不是最后一条)语句,执行将实际反弹到 except 条款 外面的 试块, 即使里面的那个有一个 除了 匹配引发的异常 . 我仍然有点不敢相信,但有一个测试案例 取物器 重新上演了这一幕。我相信“漏抽象”是这里的正确术语。

    我写了一个简单的解决方法,它只是做一些随机的事情(在本例中是从生成器中获取一个值)来分解代码中的“原子性”(不确定这是否是它的实际内容)。此解决方案可以通过 fission 参数 pqueue_fetcher.Fetcher . 它(即默认的)似乎可以工作,但肯定不会以任何我认为特别可靠或可移植的方式工作。

    因此,在发现这一有趣的数据之后,我的建议是避免使用这种技术(即调用 ctypes.pythonapi.PyThreadState_SetAsyncExc )总共。

    在任何情况下,如果需要保证接收到其整个数据集的任何请求(即对服务器的确认)都会转发到,那么这仍然不起作用。 结果 . 为了确保这一点,您必须确保最后一次网络事务和转发的位不会被中断,而不会保护整个检索操作不会被中断(因为这将防止超时工作)。为了做到这一点,您需要基本上重写检索操作(即套接字代码),以了解将要引发的异常情况。 terminable_thread.Thread.raise_exc .

    我还没有学习Twisted,但是作为首选的python异步网络框架,我希望它必须有一些优雅或至少可行的方式来处理这些细节。我希望它能提供一种并行的方法来实现从非网络源(如本地文件存储或数据库等)获取数据,因为我想构建一个应用程序,以一种中等不可知的方式从各种源收集数据。

    不管怎样,如果你仍然打算自己想出一个方法来管理线程,你也许可以从我的努力中学习。希望这有帮助。

    _·_·_·_·_·_·_·_·_·_·_·这只是在:

    我已经意识到,我认为已经稳定下来的测试实际上没有,并且给出了不一致的结果。这似乎与上述问题有关,但例外处理和 裂变 功能。我真的不知道这是怎么回事,也不打算在不久的将来调查,除非我最终需要这样做。