我有一个芹菜任务,称自己为
do_stuff.apply_async(queue="foo")
). 以前我跑过
app.control.add_consumer("foo", reply=True)
一段时间后,我想停止该队列中的所有任务以及从中启动的所有正在运行的任务
do_stuff
.
所以我运行这个代码:
app.control.cancel_consumer("foo", reply=True)
i = app.control.inspect()
for queue in [i.active, i.scheduled, i.reserved]:
for worker_name, worker_tasks in queue().items():
for task in worker_tasks:
args = ast.literal_eval(task["args"])
if "do_stuff" in task["name"] and args[0] == crawler.name:
app.control.revoke(task["id"], terminate=True)
这“管用”了。它确实停止了所有正在运行的任务
做些什么
问题是如果我跑
app.control.add\消费者(“foo”,reply=True)
同样,在不运行任何其他操作的情况下,新任务开始运行。这意味着芹菜/redis,不知何故,设法把任务放在某个地方。
为什么会这样?那些“隐藏”的任务保存在哪里?我怎样才能去掉它们?