代码之家  ›  专栏  ›  技术社区  ›  Alex B

Celery async result.collect()为空[],我做错了什么?

  •  0
  • Alex B  · 技术社区  · 1 年前

    我实施了 celery 文档中的结果示例,但我没有得到相同的结果。

    我在同一网络中有两个码头工人:我的 celery_1 码头工人,负责管理芹菜工人和 redis_1 docker运行最新 redis 形象

    我跑步 芹菜 docker和我有这个文件 tasks.py :

    from celery import group
    from tuna.celery import app
    
    @app.task(trail=True)
    def A(how_many):
        return group(B.s(i) for i in range(how_many))()
    
    @app.task(trail=True)
    def B(i):
        return pow2.delay(i)
    
    @app.task(trail=True)
    def pow2(i):
        return i ** 2
    

    以下是我在celey_1 docker中启动芹菜的方式:

    celery -A tuna.celery_app.celery worker -l info -E
    

    我可以看到它正常工作并正确注册任务:

    celery_1  |  -------------- celery@fc9a5ae8ab17 v5.3.4 (emerald-rush)
    celery_1  | --- ***** -----
    celery_1  | -- ******* ---- Linux-5.4.0-150-generic-x86_64-with-glibc2.29 2024-01-10 17:28:52
    celery_1  | - *** --- * ---
    celery_1  | - ** ---------- [config]
    celery_1  | - ** ---------- .> app:         celery_app:0x7f4f829d98e0
    celery_1  | - ** ---------- .> transport:   redis://mituna_redis:6379//
    celery_1  | - ** ---------- .> results:     redis://mituna_redis:6379/
    celery_1  | - *** --- * --- .> concurrency: 72 (prefork)
    celery_1  | -- ******* ---- .> task events: ON
    celery_1  | --- ***** -----
    celery_1  |  -------------- [queues]
    celery_1  |                 .> celery           exchange=celery(direct) key=celery
    celery_1  |
    celery_1  |
    celery_1  | [tasks]
    celery_1  |   . tuna.celery_app.celery.A
    celery_1  |   . tuna.celery_app.celery.B
    celery_1  |   . tuna.celery_app.celery.async_call
    celery_1  |   . tuna.celery_app.celery.group_tasks
    celery_1  |   . tuna.celery_app.celery.pow2
    
    

    我拥有的几乎所有不使用的功能 result.collect() 工作如预期,我能够看到结果。但我正试图以非阻塞的方式收集这些结果 collect() ,但它对我不起作用。

    这是应用程序设置:

    app = Celery('celery_app',
                 broker_url=f"redis://mituna_redis:6379//",
                 result_backend=f"redis://mituna_redis:6379/",
                 include=['tuna.celery_app.celery',]) #note I also left the include part out - tasks are still regiestered correctly, I dont think this is needed
    
    app.autodiscover_tasks()
    app.conf.result_backend_transport_options = {'retry_policy': {'timeout': 5.0}}
    logger = get_task_logger(__name__)
    
    

    我使用redis:latest作为我的图像和芹菜5.3.4

    所以在 芹菜 docker我启动我的python3解释器,并编写与文档中相同的代码。 https://docs.celeryq.dev/en/stable/reference/celery.result.html

    Python 3.8.10 (default, May 26 2023, 14:05:08)
    [GCC 9.4.0] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from tuna.celery_app.celery import A
    >>> from celery.result import ResultBase
    >>> result = A.delay(10)
    >>> [v for v in result.collect()
    ...  if not isinstance(v, (ResultBase, tuple))]
    []
    
    

    正如你所看到的, result.collect() 翻译里空荡荡的回来了。

    我在芹菜工人身上看到了收到的任务和正确计算的结果:

    celery_1  | [2024-01-10 15:01:14,312: INFO/MainProcess] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] received
    celery_1  | [2024-01-10 15:01:14,312: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[77c7e134-1922-44db-9d55-e3f6b018fb89] succeeded in 0.00052880764008s: 1
    celery_1  | [2024-01-10 15:01:14,313: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] succeeded in 0.00044824361801s: 0
    celery_1  | [2024-01-10 15:01:14,314: INFO/MainProcess] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] received
    celery_1  | [2024-01-10 15:01:14,316: INFO/MainProcess] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] received
    celery_1  | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] succeeded in 0.00066126394272s: 36
    celery_1  | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] succeeded in 0.0004749994278s: 49
    celery_1  | [2024-01-10 15:01:14,319: INFO/MainProcess] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] received
    celery_1  | [2024-01-10 15:01:14,321: INFO/MainProcess] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] received
    celery_1  | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] succeeded in 0.00038264656067s: 9
    celery_1  | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] succeeded in 0.00030431699753s: 4
    celery_1  | [2024-01-10 15:01:14,323: INFO/MainProcess] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] received
    celery_1  | [2024-01-10 15:01:14,326: INFO/MainProcess] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] received
    celery_1  | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] succeeded in 0.00024588489532s: 16
    celery_1  | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] succeeded in 0.00058472061157s: 64
    celery_1  | [2024-01-10 15:01:14,328: INFO/MainProcess] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] received
    celery_1  | [2024-01-10 15:01:14,330: INFO/MainProcess] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] received
    celery_1  | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] succeeded in 0.00030650091171s: 81
    celery_1  | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] succeeded in 0.00077548265457s: 25
    celery_1  | [2024-01-10 15:03:20,991: INFO/MainProcess] Task tuna.celery_app.celery.A[43912ac2-a04f-457b-8f59-d5d8af4bcf0b] received
    celery_1  | [2024-01-10 15:
    

    当我以阻挡的方式做这件事时,效果就像一种魅力

    >>> from tuna.celery_app.celery import pow2
    >>> g_result = group(pow2.s(2), pow2.s(4))
    >>> res = g_result()
    >>> res.get()
    [4, 16]
    
    

    为什么result.collect()不保存结果?根据文档,我应该得到这个列表:[0,1,4,9,16,25,36,49,64,81],但我的列表是空的。

    0 回复  |  直到 1 年前