代码之家  ›  专栏  ›  技术社区  ›  Hatshepsut

多次回调后触发延迟列表

  •  0
  • Hatshepsut  · 技术社区  · 7 年前

    A. twisted.internet.defer.DeferredList 是否:

    我将一组延迟合并到一个回调中。

    我跟踪他们的回访延迟列表,并 全部完成后回调,列出(成功、结果) 元组,“success”是一个布尔值。

    请注意,在将延迟的放入 延迟列表。例如,您可以在 通过将错误返回添加到延迟的邮件 之后 放置 他们在延迟列表中,因为延迟列表不会接受错误。 (虽然更方便的方法是设置 ConsumerErrors标志)

    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0): (source)
        overrides twisted.internet.defer.Deferred.__init__
        Initialize a DeferredList.
        Parameters  deferredList    The list of deferreds to track. (type: list of Deferreds )
        fireOnOneCallback   (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback
        fireOnOneErrback    (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback
        consumeErrors   (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged.
    

    具体而言:

    fireOnOneCallback (关键字param)指示只有一个 需要启动回调才能调用回调

    我在寻找这样的行为 fireOnOneCallback=True ,而不是开火 n 回调。我试过这么做,但已经变得一团糟了。我相信有更好的方法。

    def _get_fired_index(deferred_list):
        for index, (success, value) in enumerate(deferred_list):
            if success:
                return index
        raise ValueError('No deferreds were fired.')
    
    
    def _fire_on_other_callback(already_fired_index, deferred_list, callback, ):
        dlist_except_first_fired = (
            deferred_list[:already_fired_index]
            + deferred_list[already_fired_index + 1:]
        )
        dlist2 = DeferredList(dlist_except_first_fired, fireOnOneCallback=True)
        dlist2.addCallback(callback, deferred_list)
    
    
    def _fire_on_two_callbacks(deferreds, callback, errback):
        dlist1 = DeferredList(deferreds, fireOnOneCallback=True)
        dlist1.addCallback(_get_fired_index)
        dlist1.addCallback(_fire_on_other_callback, deferreds, callback, errback)
    
    2 回复  |  直到 7 年前
        1
  •  2
  •   Jean-Paul Calderone    7 年前

    这里有一种可能的方法。

    from __future__ import print_function
    
    import attr
    from twisted.internet.defer import Deferred
    
    def fireOnN(n, ds):
        acc = _Accumulator(n)
        for index, d in enumerate(ds):
            d.addCallback(acc.one_result, index)
        return acc.n_results
    
    @attr.s
    class _Accumulator(object):
        n = attr.ib()
        so_far = attr.ib(default=attr.Factory(dict))
        done = attr.ib(default=False)
        n_results = attr.ib(default=attr.Factory(Deferred))
    
        def one_result(self, result, index):
            if self.done:
                return result
            self.so_far[index] = result
            if len(self.so_far) == self.n:
                self.done = True
                so_far = self.so_far
                self.so_far = None
                self.n_results.callback(so_far)
    
    dx = list(Deferred().addCallback(print, i) for i in range(3))
    done = fireOnN(2, dx)
    done.addCallback(print, "done")
    
    for i, d in enumerate(dx):
        d.callback("result {}".format(i))
    

    请注意,此实现不处理错误,可能还有其他缺点(如保留 n_results 参考)。然而,基本思想是合理的:从回调中积累状态,直到达到所需的条件,然后触发另一个延迟状态。

    DeferredList 这只会给这个问题带来不必要的复杂性,因为它的无关特性和界面不是为解决这个问题而设计的。

        2
  •  1
  •   Jack Robison    7 年前

    下面是另一种使用 DeferredSemaphore 处理潜在的竞争条件。一旦 n 被推迟的人中有人被解雇了,其余的人被取消了。

    from twisted.internet import defer
    
    
    def fireAfterNthCallback(deferreds, n):
        if not n or n > len(deferreds):
            raise ValueError
    
        results = {}
        finished_deferred = defer.Deferred()
        sem = defer.DeferredSemaphore(1)
    
        def wrap_sem(result, index):
            return sem.run(callback_result, result, index)
    
        def cancel_remaining():
            finished = [deferreds[index] for index in results.keys()]
            for d in finished:
                deferreds.remove(d)
            for d in deferreds:
                d.addErrback(lambda err: err.trap(defer.CancelledError))
                d.cancel()
    
        def callback_result(result, index):
            results[index] = result
            if len(results) >= n:
                cancel_remaining()
                finished_deferred.callback(results.values())
            return result
    
        for deferred_index, deferred in enumerate(deferreds):
            deferred.addCallback(wrap_sem, deferred_index)
    
        return finished_deferred