代码之家  ›  专栏  ›  技术社区  ›  agrav

Celery:安排一个新任务在现有任务完成后运行

  •  0
  • agrav  · 技术社区  · 1 年前

    假设我有以下芹菜任务:

    # tasks.py
    @app.task
    def taskA():
        time.sleep(10)
        return "result"
    
    @app.task
    def taskB(result):
        print("got result")
    

    在我的项目中,我经常开始新的 taskA 任务如下:

    res = taskA.apply_async()
    keep_for_later(res.id)
    

    然后,有时在执行程序的过程中,由于某些情况,我需要运行 taskB 任务之后 taskA 任务已完成。

    taskA_id = get_task_id()
    if condition_is_met():
        res = app.AsyncResult(taskA_id)
        # How can I tell celery to run a `taskB` task at earliest when this `taskA` task has finished,
        # and with the output from taskA as input to taskB ?
    

    我正在管理一个单独的芹菜工人,如下所示:

    celery -A tasks worker
    

    我需要这两个任务在独立于主程序的辅助程序上异步运行。 在Ubuntu上使用芹菜v5.3.6。

    请注意,我高度简化了这段代码,可能不会按原样运行,但希望问题变得足够清楚。

    编辑:

    我发现以下内容有效:

    # tasks.py
    @app.task
    def taskA():
        time.sleep(10)
        return "result"
    
    @app.task
    def taskB(task_id):
        res = celery_app.AsyncResult(task_id)
        with allow_join_result():
            taskA_result = res.get()
        # do stuff with taskA_result
    

    然后,稍后:

    taskA_id = get_task_id()
    if condition_is_met():
        taskB.apply_async((taskA_id,))
    
    0 回复  |  直到 1 年前