Модульное тестирование с помощью django-celery?

82

Я пытаюсь придумать методологию тестирования для нашего проекта django-celery . Я прочитал примечания в документации , но это не дало мне хорошего представления о том, что на самом деле делать. Я не беспокоюсь о тестировании задач в реальных демонах, а только о функциональности моего кода. В основном мне интересно:

  1. Как мы можем обойтись task.delay()во время теста (я пробовал настройку, CELERY_ALWAYS_EAGER = Trueно без разницы)?
  2. Как мы можем использовать рекомендуемые тестовые настройки (если это лучший способ) без фактического изменения нашего settings.py?
  3. Можем ли мы по-прежнему использовать manage.py testили должны использовать пользовательский раннер?

В целом, любые советы или подсказки по тестированию с сельдереем были бы очень полезны.

Джейсон Уэбб
источник
1
что значит CELERY_ALWAYS_EAGERбез разницы?
спрашиваетol 03
Я все еще получаю сообщения об отсутствии связи с rabbitmq.
Джейсон Уэбб
У вас есть обратная связь? Я предполагаю, что что-то другое, кроме .delayпопытки установить соединение.
спрашиваетol 03
11
BROKER_BACKEND=memoryВ этом случае может помочь настройка .
спрашиваетol 03
Спросите, вы были правы. BROKER_BACKEND=memoryпочинил это. Если вы поставите это как ответ, я отмечу, что это правильно.
Джейсон Уэбб

Ответы:

43

Попробуйте установить:

BROKER_BACKEND = 'memory'

(Спасибо за комментарий asksol .)

оплачиваемый ботаник
источник
8
Я считаю, что в этом больше нет необходимости, когда установлен CELERY_ALWAYS_EAGER.
mlissner
3
Вы нашли решение для сельдерея 4?
Дэвид Шуман
72

Мне нравится использовать декоратор override_settings в тестах, для завершения которых нужны результаты сельдерея.

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

Если вы хотите применить это ко всем тестам, вы можете использовать средство запуска тестов сельдерея, как описано на http://docs.celeryproject.org/en/2.5/django/unit-testing.html, которое в основном устанавливает те же настройки, кроме ( BROKER_BACKEND = 'memory').

В настройках:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Посмотрите на исходный код CeleryTestSuiteRunner, и становится ясно, что происходит.

Джошуа
источник
1
Это не сработало с сельдереем 4, даже с переименованием полей отсюда
шади
Работает на сельдерее 3.1. У меня просто мои тестовые примеры Celery наследуются от родительского класса с этим декоратором. Таким образом, он нужен только в одном месте, и вам не нужно втягиваться djcelery.
kontextify
1
Это отлично работает на Celery 4.4. и Django 2.2. Лучший подход к запуску модульных тестов, с которым я сталкивался до сих пор.
Эрик Калкокен
18

Вот отрывок из моего базового класса тестирования, в котором этот apply_asyncметод заменяется заглушкой и записываются вызовы к нему (в том числе Task.delay.) Это немного грубо, но в течение последних нескольких месяцев мне удалось удовлетворить мои потребности.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

Вот пример того, как вы могли бы использовать его в своих тестовых случаях:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)
Сэм Долан
источник
4

поскольку я все еще вижу это в результатах поиска, настройки отменяются

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

работал у меня в соответствии с документами Celery

dorkforce
источник
1

Для всех, кто попадает сюда в 2019 году: ознакомьтесь с этой статьей, в которой рассматриваются различные стратегии, включая синхронный вызов задач.

Сибирский
источник
1

Вот что я сделал

Внутри myapp.tasks.py у меня есть:

from celery import shared_task

@shared_task()
def add(a, b):
    return a + b

Внутри myapp.test_tasks.py у меня есть:

from django.test import TestCase, override_settings
from myapp.tasks import add


class TasksTestCase(TestCase):

    def setUp(self):
        ...

    @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
    def test_create_sections(self):
        result= add.delay(1,2)
        assert result.successful() == True
        assert result.get() == 3
Марко Фратталлоне
источник