代码之家  ›  专栏  ›  技术社区  ›  wotanii

当我不存储芹菜任务的任务ID时,我如何从redis后端获得它们的结果/失败?

  •  0
  • wotanii  · 技术社区  · 4 年前

    在我的web应用程序中,我使用celence启动后台作业,而不存储它们的id。有些任务是周期性的,有些任务是由用户交互触发的。芹菜任务只是完成它们的任务,最终用户将在浏览器中看到更新的数据。当任务最近失败时,我想通知所有登录的管理员用户(因为他们通常是触发最近失败的人)。所以他们至少知道出了什么事。

    我发现的相关芹菜方法要么需要有效的任务id(例如。 celery.result.AsyncResult )或者他们只有关于活动任务的信息,而没有关于已完成/失败任务的信息(例如。 celery.app.control.Inspect )。

    我使用了一个flask前端,一个用于芹菜的redis后端,还有一个用于持久数据的常规DB。

    在这种情况下,我将如何收集有关最近完成或失败的芹菜任务的信息?

    我尝试过的:

    # I setup celery with 
    my_celery_project = Celery(__name__,
                    backend='redis://localhost:1234/0',
                    broker='redis://localhost:1234/0')
    
    # later in the view I want to collect status information:
    
    i = my_celery_project.control.inspect()
    
    i.active() # this one exists, but I don't care about it
    i.failed() # this is what I want, but it doesn't exist
    i.results() # this also doesn't exist
    
    
    # getting the result directly also doesn't work, since they require an id, which i don't have
    res = AsyncResult(id_i_don_have,app=app)
    

    这应该是可能的,因为我可以用 redis-cli --scan 然后做 my_task.AsyncResult('id_from_redis').status 以检查结果。类似于flower的东西也可以工作,但我认为,在网络应用程序的无状态性质下,这不会很好地工作。


    这不是这些问题的重复,因为它们没有假设redis后端。此外,它们已经过时4年多了:

    这不是这些问题的重复,因为我的redis后端实际上正在工作:

    这不是这些问题的重复,因为它与我的问题正好相反。他们关心旧的结果,而我明确只关心最近的结果: How to read celery results from redis result backend

    0 回复  |  直到 4 年前
        1
  •  2
  •   ACE Fly    4 年前

    您应该使用信号,如下所示:

    from celery import signals
    
    @signals.task_failure.connect
    def exception_handle(sender, task_id, exception, **kwargs):
        if isinstance(exception, redis.exceptions.LockError):
            loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
            return
        loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")
    
    @signals.after_setup_logger.connect
    def celery_log(logger, **kwargs):
        check_console(logger, **kwargs)
    
    
    @signals.after_setup_task_logger.connect
    def task_log(logger, **kwargs):
        # todo: add your loggre handle herre...
        check_console(logger, **kwargs)
    
    
    @signals.worker_ready.connect
    def clean_lock(**kwargs):
        loggert.info('worker_ready')
    
    
    @signals.worker_init.connect
    def hook_prefork(sender, **kwargs):
        ...
    
    def check_console(logger, format, **kwargs):
        if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
            console = logging.StreamHandler()
            console.setFormatter(logging.Formatter(format))
            console.setLevel(logging.INFO)
            logger.addHandler(console)
    
        2
  •  1
  •   wotanii    4 年前

    最后,我的解决方案是直接从后端获取ID,然后通过我的芹菜实例将它们转换为Object:

    
      task_results: List[AsyncResult] = []
      for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
        task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
        task_results.append(self.celery.AsyncResult(task_id))
      return task_results
    

    然后我用 async_result.ready() 过滤掉我感兴趣的那些。

    附带说明:现在我也打电话 async_result.forget() 清理以前没有做过的旧任务。