代码之家  ›  专栏  ›  技术社区  ›  Ander

如何使用芹菜和姜戈将任务路由到不同的队列

  •  2
  • Ander  · 技术社区  · 7 年前

    我正在使用以下堆栈:

    • Python 3.6
    • 芹菜V4.2.1 (经纪人: RabByMQ V3.06 )
    • Django V2.0.4 .

    根据 Celery's documentation ,在不同队列上运行计划任务应与为上的任务定义相应的队列一样容易。 芹菜路线 尽管如此,所有任务似乎都是在芹菜的默认队列上执行的。

    这是上的配置 我的应用程序/设置.py :

    CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
    CELERY_ROUTES = {
     'app1.tasks.*': {'queue': 'queue1'},
     'app2.tasks.*': {'queue': 'queue2'},
    }
    CELERY_BEAT_SCHEDULE = {
        'app1_test': {
            'task': 'app1.tasks.app1_test',
            'schedule': 15,
        },
        'app2_test': {
            'task': 'app2.tasks.app2_test',
            'schedule': 15,
        },
    
    }
    

    这些任务只是测试路由的简单脚本:

    文件 APP1/TASKS.Py :

    from my_app.celery import app
    import time
    
    
    @app.task()
    def app1_test():
        print('I am app1_test task!')
        time.sleep(10)
    

    文件 APP2/TASKS.Py :

    from my_app.celery import app
    import time
    
    
    @app.task()
    def app2_test():
        print('I am app2_test task!')
        time.sleep(10)
    

    当我用所有需要的队列运行芹菜时:

    celery -A my_app worker -B -l info -Q celery,queue1,queue2
    

    rabbitmq将显示只有默认队列“ 芹菜 “正在运行任务:

    sudo rabbitmqctl list_queues
    # Tasks executed by each queue:
    #  - celery 2
    #  - queue1 0
    #  - queue2 0
    

    有人知道如何解决这种意想不到的行为吗?

    当做,

    2 回复  |  直到 6 年前
        1
  •  5
  •   Ander    6 年前

    我已经开始工作了,这里没有什么值得注意的:

    根据 Celery's 4.2.0 documentation , 芹菜路线 应该是用于定义队列路由的变量,但它仅对我使用 芹菜任务路线 相反。任务路由似乎独立于芹菜节拍,因此这仅适用于手动计划的任务:

    app1_test.delay()
    app2_test.delay()
    

    app1_test.apply_async()
    app2_test.apply_async()
    

    为了使它与芹菜节拍一起工作,我们只需要显式地定义队列。文件的最终设置 我的应用程序/设置.py 如下:

    CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
    CELERY_TASK_ROUTES = {
     'app1.tasks.*': {'queue': 'queue1'},
     'app2.tasks.*': {'queue': 'queue2'},
    }
    CELERY_BEAT_SCHEDULE = {
        'app1_test': {
            'task': 'app1.tasks.app1_test',
            'schedule': 15,
            'options': {'queue': 'queue1'}
        },
        'app2_test': {
            'task': 'app2.tasks.app2_test',
            'schedule': 15,
            'options': {'queue': 'queue2'}
        },
    
    }
    

    我希望这能为其他开发人员节省一些时间。

        2
  •  4
  •   JPG    7 年前

    添加 queue decorator的参数可以帮助您,

    @app.task(queue='queue1')
    def app1_test():
        print('I am app1_test task!')
        time.sleep(10)
    推荐文章