我实施了
celery
文档中的结果示例,但我没有得到相同的结果。
我在同一网络中有两个码头工人:我的
celery_1
码头工人,负责管理芹菜工人和
redis_1
docker运行最新
redis
形象
我跑步
芹菜
docker和我有这个文件
tasks.py
:
from celery import group
from tuna.celery import app
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
以下是我在celey_1 docker中启动芹菜的方式:
celery -A tuna.celery_app.celery worker -l info -E
我可以看到它正常工作并正确注册任务:
celery_1 | -------------- celery@fc9a5ae8ab17 v5.3.4 (emerald-rush)
celery_1 | --- ***** -----
celery_1 | -- ******* ---- Linux-5.4.0-150-generic-x86_64-with-glibc2.29 2024-01-10 17:28:52
celery_1 | - *** --- * ---
celery_1 | - ** ---------- [config]
celery_1 | - ** ---------- .> app: celery_app:0x7f4f829d98e0
celery_1 | - ** ---------- .> transport: redis://mituna_redis:6379//
celery_1 | - ** ---------- .> results: redis://mituna_redis:6379/
celery_1 | - *** --- * --- .> concurrency: 72 (prefork)
celery_1 | -- ******* ---- .> task events: ON
celery_1 | --- ***** -----
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . tuna.celery_app.celery.A
celery_1 | . tuna.celery_app.celery.B
celery_1 | . tuna.celery_app.celery.async_call
celery_1 | . tuna.celery_app.celery.group_tasks
celery_1 | . tuna.celery_app.celery.pow2
我拥有的几乎所有不使用的功能
result.collect()
工作如预期,我能够看到结果。但我正试图以非阻塞的方式收集这些结果
collect()
,但它对我不起作用。
这是应用程序设置:
app = Celery('celery_app',
broker_url=f"redis://mituna_redis:6379//",
result_backend=f"redis://mituna_redis:6379/",
include=['tuna.celery_app.celery',])
app.autodiscover_tasks()
app.conf.result_backend_transport_options = {'retry_policy': {'timeout': 5.0}}
logger = get_task_logger(__name__)
我使用redis:latest作为我的图像和芹菜5.3.4
所以在
芹菜
docker我启动我的python3解释器,并编写与文档中相同的代码。
https://docs.celeryq.dev/en/stable/reference/celery.result.html
Python 3.8.10 (default, May 26 2023, 14:05:08)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tuna.celery_app.celery import A
>>> from celery.result import ResultBase
>>> result = A.delay(10)
>>> [v for v in result.collect()
... if not isinstance(v, (ResultBase, tuple))]
[]
正如你所看到的,
result.collect()
翻译里空荡荡的回来了。
我在芹菜工人身上看到了收到的任务和正确计算的结果:
celery_1 | [2024-01-10 15:01:14,312: INFO/MainProcess] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] received
celery_1 | [2024-01-10 15:01:14,312: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[77c7e134-1922-44db-9d55-e3f6b018fb89] succeeded in 0.00052880764008s: 1
celery_1 | [2024-01-10 15:01:14,313: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] succeeded in 0.00044824361801s: 0
celery_1 | [2024-01-10 15:01:14,314: INFO/MainProcess] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] received
celery_1 | [2024-01-10 15:01:14,316: INFO/MainProcess] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] received
celery_1 | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] succeeded in 0.00066126394272s: 36
celery_1 | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] succeeded in 0.0004749994278s: 49
celery_1 | [2024-01-10 15:01:14,319: INFO/MainProcess] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] received
celery_1 | [2024-01-10 15:01:14,321: INFO/MainProcess] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] received
celery_1 | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] succeeded in 0.00038264656067s: 9
celery_1 | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] succeeded in 0.00030431699753s: 4
celery_1 | [2024-01-10 15:01:14,323: INFO/MainProcess] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] received
celery_1 | [2024-01-10 15:01:14,326: INFO/MainProcess] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] received
celery_1 | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] succeeded in 0.00024588489532s: 16
celery_1 | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] succeeded in 0.00058472061157s: 64
celery_1 | [2024-01-10 15:01:14,328: INFO/MainProcess] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] received
celery_1 | [2024-01-10 15:01:14,330: INFO/MainProcess] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] received
celery_1 | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] succeeded in 0.00030650091171s: 81
celery_1 | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] succeeded in 0.00077548265457s: 25
celery_1 | [2024-01-10 15:03:20,991: INFO/MainProcess] Task tuna.celery_app.celery.A[43912ac2-a04f-457b-8f59-d5d8af4bcf0b] received
celery_1 | [2024-01-10 15:
当我以阻挡的方式做这件事时,效果就像一种魅力
>>> from tuna.celery_app.celery import pow2
>>> g_result = group(pow2.s(2), pow2.s(4))
>>> res = g_result()
>>> res.get()
[4, 16]
为什么result.collect()不保存结果?根据文档,我应该得到这个列表:[0,1,4,9,16,25,36,49,64,81],但我的列表是空的。