假设我有以下芹菜任务:
# tasks.py
@app.task
def taskA():
time.sleep(10)
return "result"
@app.task
def taskB(result):
print("got result")
在我的项目中,我经常开始新的
taskA
任务如下:
res = taskA.apply_async()
keep_for_later(res.id)
然后,有时在执行程序的过程中,由于某些情况,我需要运行
taskB
任务之后
taskA
任务已完成。
taskA_id = get_task_id()
if condition_is_met():
res = app.AsyncResult(taskA_id)
# How can I tell celery to run a `taskB` task at earliest when this `taskA` task has finished,
# and with the output from taskA as input to taskB ?
我正在管理一个单独的芹菜工人,如下所示:
celery -A tasks worker
我需要这两个任务在独立于主程序的辅助程序上异步运行。
在Ubuntu上使用芹菜v5.3.6。
请注意,我高度简化了这段代码,可能不会按原样运行,但希望问题变得足够清楚。
编辑:
我发现以下内容有效:
# tasks.py
@app.task
def taskA():
time.sleep(10)
return "result"
@app.task
def taskB(task_id):
res = celery_app.AsyncResult(task_id)
with allow_join_result():
taskA_result = res.get()
# do stuff with taskA_result
然后,稍后:
taskA_id = get_task_id()
if condition_is_met():
taskB.apply_async((taskA_id,))