我试着用芹菜加姜果,但没有走多远。我也用django芹菜打。我似乎没有发现其他模块中的共享任务。
项目/项目/设置.py:
INSTALLED_APPS = [
"django_celery_beat",
"proj",
"proj.myapp",
]
CELERY_BROKER_URL = "amqp://user:password@localhost:5672/vhost"
项目/项目/初始计划:
from proj.celery import app as celery_app
__all__ = ('celery_app',)
import os
from celery import Celery
from django.conf import settings
if not settings.configured:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
项目/应用/任务.py:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
有什么想法为什么?
编辑:
啊!!
原来“faker”库(我在愚蠢的测试任务中使用的)并没有安装在我的环境中。不过,芹菜的
autodiscover_tasks
函数无声地失败了-所以我不知道错误是什么。
所以基本上,这是程序员的愚蠢。