代码之家  ›  专栏  ›  技术社区  ›  alexandernst

芹菜任务未完全从队列中删除

  •  0
  • alexandernst  · 技术社区  · 6 年前

    我有一个芹菜任务,称自己为 do_stuff.apply_async(queue="foo") ). 以前我跑过 app.control.add_consumer("foo", reply=True)

    一段时间后,我想停止该队列中的所有任务以及从中启动的所有正在运行的任务 do_stuff .

    所以我运行这个代码:

    app.control.cancel_consumer("foo", reply=True)
    
    i = app.control.inspect()
    for queue in [i.active, i.scheduled, i.reserved]:
        for worker_name, worker_tasks in queue().items():
            for task in worker_tasks:
                args = ast.literal_eval(task["args"])
                if "do_stuff" in task["name"] and args[0] == crawler.name:
                    app.control.revoke(task["id"], terminate=True)
    

    这“管用”了。它确实停止了所有正在运行的任务 做些什么

    问题是如果我跑 app.control.add\消费者(“foo”,reply=True) 同样,在不运行任何其他操作的情况下,新任务开始运行。这意味着芹菜/redis,不知何故,设法把任务放在某个地方。

    为什么会这样?那些“隐藏”的任务保存在哪里?我怎样才能去掉它们?

    1 回复  |  直到 6 年前
        1
  •  1
  •   alexandernst    6 年前

    回答我自己的问题:发生这种情况是因为当我让工人不从队列中消费时(通过调用 cancel_consumer

    我找到了一种(通过编程)刷新队列的方法:

    from celery.bin.celery import CeleryCommand
    cmd = CeleryCommand()
    super(CeleryCommand, cmd).execute_from_commandline([
        '',
        'purge',
        '-f',
        '-Q', queue_name,
        '-A', 'main'
    ])