代码之家  ›  专栏  ›  技术社区  ›  Jason Webb

用django芹菜进行单元测试?

  •  78
  • Jason Webb  · 技术社区  · 14 年前

    我正在尝试为我们的 django-celery 项目。我读过 documentation ,但这并没有给我一个好主意。我不担心在实际的守护进程中测试任务,只担心 代码。我主要想知道:

    1. 我们如何绕过 task.delay() 在测试中(我试着设置 CELERY_ALWAYS_EAGER = True 但没什么区别?
    2. 我们如何使用推荐的测试设置(如果这是最好的方法),而不实际更改我们的设置.py?
    3. 我们还能用吗 manage.py test

    总的来说,任何提示或芹菜测试技巧都会非常有用。

    5 回复  |  直到 14 年前
        1
  •  42
  •   Community CDub    8 年前

    尝试设置:

    BROKER_BACKEND = 'memory'
    

    (感谢 asksol 的评论。)

        2
  •  68
  •   Martin Geisler    9 年前

    from django.test import TestCase
    from django.test.utils import override_settings
    from myapp.tasks import mytask
    
    class AddTestCase(TestCase):
    
        @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                           CELERY_ALWAYS_EAGER=True,
                           BROKER_BACKEND='memory')
        def test_mytask(self):
            result = mytask.delay()
            self.assertTrue(result.successful())
    

    如果要将此应用于所有测试,可以使用芹菜测试运行器,如中所述 http://docs.celeryproject.org/en/2.5/django/unit-testing.html 基本上设置了相同的设置,除了( BROKER_BACKEND = 'memory' ).

    TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
    

    看看芹菜SuiteRunner的来源,很清楚发生了什么。

        3
  •  16
  •   Benjamin Sam Dolan    11 年前

    下面是我的测试基类的一个摘录 apply_async 方法并记录对它的调用(包括 Task.delay )有点恶心,但在过去的几个月里我一直在用它来满足我的需要。

    from django.test import TestCase
    from celery.task.base import Task
    # For recent versions, Task has been moved to celery.task.app:
    # from celery.app.task import Task
    # See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html
    
    class CeleryTestCaseBase(TestCase):
    
        def setUp(self):
            super(CeleryTestCaseBase, self).setUp()
            self.applied_tasks = []
    
            self.task_apply_async_orig = Task.apply_async
    
            @classmethod
            def new_apply_async(task_class, args=None, kwargs=None, **options):
                self.handle_apply_async(task_class, args, kwargs, **options)
    
            # monkey patch the regular apply_sync with our method
            Task.apply_async = new_apply_async
    
        def tearDown(self):
            super(CeleryTestCaseBase, self).tearDown()
    
            # Reset the monkey patch to the original method
            Task.apply_async = self.task_apply_async_orig
    
        def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
            self.applied_tasks.append((task_class, tuple(args), kwargs))
    
        def assert_task_sent(self, task_class, *args, **kwargs):
            was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                           for task in self.applied_tasks)
            self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))
    
        def assert_task_not_sent(self, task_class):
            was_sent = any(task_class == task[0] for task in self.applied_tasks)
            self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)
    

    下面是一个“从头开始”的例子,说明如何在测试用例中使用它:

    mymodule.py

    from my_tasks import SomeTask
    
    def run_some_task(should_run):
        if should_run:
            SomeTask.delay(1, some_kwarg=2)
    

    test_mymodule.py

    class RunSomeTaskTest(CeleryTestCaseBase):
        def test_should_run(self):
            run_some_task(should_run=True)
            self.assert_task_sent(SomeTask, 1, some_kwarg=2)
    
        def test_should_not_run(self):
            run_some_task(should_run=False)
            self.assert_task_not_sent(SomeTask)
    
        4
  •  4
  •   dorkforce    10 年前

    因为我仍然在搜索结果中看到这个,设置覆盖

    TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
    

    为我工作 Celery Docs

        5
  •  0
  •   Sibirsky    6 年前

    对于2019年来这里的每个人:结帐 this article

        6
  •  0
  •   Marco Frattallone    6 年前

    我就是这么做的

    在myapp.tasks.py中,我有:

    from celery import shared_task
    
    @shared_task()
    def add(a, b):
        return a + b
    

    from django.test import TestCase, override_settings
    from myapp.tasks import add
    
    
    class TasksTestCase(TestCase):
    
        def setUp(self):
            ...
    
        @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
        def test_create_sections(self):
            result= add.delay(1,2)
            assert result.successful() == True
            assert result.get() == 3