代码之家  ›  专栏  ›  技术社区  ›  Luper Rouch François Lagunas

用芹菜执行“独特”任务

  •  45
  • Luper Rouch François Lagunas  · 技术社区  · 14 年前

    我用芹菜更新我的新闻聚合网站的RSS订阅源。我为每个feed使用一个@task,事情看起来运行得很好。

    当前我存储任务结果并检查其状态,如下所示:

    import socket
    from datetime import timedelta
    from celery.decorators import task, periodic_task
    from aggregator.models import Feed
    
    
    _results = {}
    
    
    @periodic_task(run_every=timedelta(minutes=1))
    def fetch_articles():
        for feed in Feed.objects.all():
            if feed.pk in _results:
                if not _results[feed.pk].ready():
                    # The task is not finished yet
                    continue
            _results[feed.pk] = update_feed.delay(feed)
    
    
    @task()
    def update_feed(feed):
        try:
            feed.fetch_articles()
        except socket.error, exc:
            update_feed.retry(args=[feed], exc=exc)
    

    5 回复  |  直到 7 年前
        1
  •  31
  •   mmoya Mohammed Azharuddin Shaikh    11 年前
        2
  •  44
  •   Peter Kilczuk    12 年前

    def single_instance_task(timeout):
        def task_exc(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                lock_id = "celery-single-instance-" + func.__name__
                acquire_lock = lambda: cache.add(lock_id, "true", timeout)
                release_lock = lambda: cache.delete(lock_id)
                if acquire_lock():
                    try:
                        func(*args, **kwargs)
                    finally:
                        release_lock()
            return wrapper
        return task_exc
    

    然后,像这样使用它。。。

    @periodic_task(run_every=timedelta(minutes=1))
    @single_instance_task(60*10)
    def fetch_articles()
        yada yada...
    
        3
  •  15
  •   vdboor    9 年前

    使用 https://pypi.python.org/pypi/celery_once 这项工作似乎做得很好,包括报告错误和测试一些参数的唯一性。

    from celery_once import QueueOnce
    from myapp.celery import app
    from time import sleep
    
    @app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
    def start_billing(customer_id, year, month):
        sleep(30)
        return "Done!"
    

    只需要在项目中进行以下设置:

    ONCE_REDIS_URL = 'redis://localhost:6379/0'
    ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
    
        4
  •  8
  •   jbkkd    9 年前

    如果你想找一个不使用Django的例子,那么 try this example (注意:改用Redis,我已经用过了)。

    import redis
    
    REDIS_CLIENT = redis.Redis()
    
    def only_one(function=None, key="", timeout=None):
        """Enforce only one celery task at a time."""
    
        def _dec(run_func):
            """Decorator."""
    
            def _caller(*args, **kwargs):
                """Caller."""
                ret_value = None
                have_lock = False
                lock = REDIS_CLIENT.lock(key, timeout=timeout)
                try:
                    have_lock = lock.acquire(blocking=False)
                    if have_lock:
                        ret_value = run_func(*args, **kwargs)
                finally:
                    if have_lock:
                        lock.release()
    
                return ret_value
    
            return _caller
    
        return _dec(function) if function is not None else _dec
    
        5
  •  0
  •   user12397901    11 年前

    此解决方案适用于芹菜在同一主机上工作时并发性大于1。其他类型的锁(没有依赖项,比如redis)基于差异文件的锁在并发性大于1时不起作用。

    class Lock(object):
        def __init__(self, filename):
            self.f = open(filename, 'w')
    
        def __enter__(self):
            try:
                flock(self.f.fileno(), LOCK_EX | LOCK_NB)
                return True
            except IOError:
                pass
            return False
    
        def __exit__(self, *args):
            self.f.close()
    
    
    class SinglePeriodicTask(PeriodicTask):
        abstract = True
        run_every = timedelta(seconds=1)
    
        def __call__(self, *args, **kwargs):
            lock_filename = join('/tmp',
                                 md5(self.name).hexdigest())
            with Lock(lock_filename) as is_locked:
                if is_locked:
                    super(SinglePeriodicTask, self).__call__(*args, **kwargs)
                else:
                    print 'already working'
    
    
    class SearchTask(SinglePeriodicTask):
        restart_delay = timedelta(seconds=60)
    
        def run(self, *args, **kwargs):
            print self.name, 'start', datetime.now()
            sleep(5)
            print self.name, 'end', datetime.now()