Как выполнить модульное тестирование задачи Celery?

Ответы:

61

Можно тестировать задачи синхронно, используя любую имеющуюся библиотеку unittest. Я обычно провожу 2 разных тестовых сессии при работе с сельдереем. Первый (как я предлагаю ниже) является полностью синхронным и должен гарантировать, что алгоритм выполняет то, что он должен делать. Второй сеанс использует всю систему (включая брокера) и удостоверяется, что у меня нет проблем с сериализацией или каких-либо других проблем с распределением или связью.

Так:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

И ваш тест:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Надеюсь, это поможет!

FlaPer87
источник
1
Это работает, за исключением задач, которые используют HttpDispatchTask - docs.celeryproject.org/en/latest/userguide/remote-tasks.html, где мне нужно установить celery.conf.CELERY_ALWAYS_EAGER = True, но даже с установкой celery.conf.CELERY_IMPORTS = ('celery.task.http') тест не проходит с NotRegistered: celery.task.http.HttpDispatchTask
davidmytton
Странно, вы уверены, что у вас нет проблем с импортом? Этот тест работает (обратите внимание, что я имитирую ответ, поэтому он возвращает то, что ожидает сельдерей). Кроме того, модули, определенные в CELERY_IMPORTS, будут импортированы во время инициализации воркеров , чтобы избежать этого, я предлагаю вам позвонить celery.loader.import_default_modules().
FlaPer87,
Я также предлагаю вам посмотреть здесь . Он издевается над http-запросом. Не знаю, поможет ли это, я думаю, вы хотите протестировать работающую службу, не так ли?
FlaPer87,
52

Я использую это:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Документы: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER позволяет запускать задачу синхронно, и вам не нужен сервер сельдерея.

Геттли
источник
1
Думаю, это устарело - я понял ImportError: No module named celeryconfig.
Daenyth
7
Я считаю, что выше предполагается, что модуль celeryconfig.pyсуществует в одном пакете. См. Docs.celeryproject.org/en/latest/getting-started/… .
Камил Синди
1
Я знаю, что он старый, но можете ли вы предоставить полный пример того, как запускать задачи addиз вопроса OP в TestCaseклассе?
Kruupös
@ MaxChrétien, извините, я не могу привести полный пример, так как я больше не использую сельдерей. Вы можете отредактировать мой вопрос, если у вас достаточно очков репутации. Если у вас недостаточно информации, дайте мне знать, что я должен скопировать + вставить в этот ответ.
Guettli
1
@ miken32 спасибо. Поскольку последний ответ каким-то образом решает проблему, с которой я хотел помочь, я просто оставил комментарий, который в официальной документации для 4.0 не рекомендуется использовать CELERY_TASK_ALWAYS_EAGERдля модульного тестирования.
Krassowski
33

Зависит от того, что именно вы хотите тестировать.

  • Протестируйте код задачи напрямую. Не вызывайте «task.delay (...)», просто вызывайте «task (...)» из своих модульных тестов.
  • Используйте CELERY_ALWAYS_EAGER . Это вызовет немедленный вызов ваших задач в момент, когда вы скажете «task.delay (...)», так что вы сможете протестировать весь путь (но не асинхронное поведение).
слейси
источник
24

модульный тест

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test приспособления

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Приложение: заставьте send_task уважать

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Камил Синди
источник
23

Для тех, кто употребляет Celery 4, это:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Поскольку имена настроек были изменены и их необходимо обновить, если вы решите обновить, см.

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

окрутный
источник
11
Согласно официальной документации , использование task_always_eager (ранее CELERY_ALWAYS_EAGER) не подходит для модульного тестирования. Вместо этого они предлагают другие, отличные способы модульного тестирования вашего приложения Celery.
krassowski
4
Я просто добавлю, что причина, по которой вам не нужны активные задачи в модульных тестах, заключается в том, что в этом случае вы не тестируете, например, сериализацию параметров, которая произойдет, когда вы используете код в производстве.
черт
15

Начиная с Celery 3.0 , один из способов установки CELERY_ALWAYS_EAGERв Django :

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Аарон Лелевье
источник
7

Начиная с Celery v4.0 , приспособления py.test предназначены для запуска работника сельдерея только для теста и закрываются по завершении:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Среди других приспособлений, описанных на http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test , вы можете изменить параметры сельдерея по умолчанию, переопределив celery_configприспособление следующим образом:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

По умолчанию тестовый работник использует брокер в памяти и бэкэнд результатов. Нет необходимости использовать локальный Redis или RabbitMQ, если не тестируете определенные функции.

Alanjds
источник
Уважаемый, отрицательный, не могли бы вы поделиться, почему это плохой ответ? Искренне спасибо.
alanjds
2
У меня не получилось, просто зависает набор тестов. Не могли бы вы предоставить еще контекст? (Хотя я еще не голосовал;)).
duality_ 02
В моем случае мне пришлось явно установить приспособление celey_config для использования брокера памяти и бэкэнда кеша + памяти
Санзогензо
5

ссылка с использованием pytest.

def test_add(celery_worker):
    mytask.delay()

если вы используете колбу, установите конфигурацию приложения

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

И в conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Йоге
источник
2

В моем случае (и я предполагаю, что многие другие) все, что я хотел, это проверить внутреннюю логику задачи с помощью pytest.

TL; DR; закончил тем, что высмеивал все ( ВАРИАНТ 2 )


Пример использования :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

но, поскольку shared_taskдекоратор выполняет большую часть внутренней логики сельдерея, на самом деле это не модульные тесты.

Итак, у меня было 2 варианта:

ВАРИАНТ 1: Отдельная внутренняя логика

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Это выглядит очень странно, и кроме того, что делает его менее читаемым, требуется вручную извлекать и передавать атрибуты, которые являются частью запроса, например, task_idв случае, если он вам нужен, что делает логику менее чистой.

ВАРИАНТ 2: насмехается
над внутренностями сельдерея

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

что затем позволяет мне издеваться над объектом запроса (опять же, если вам нужны вещи из запроса, такие как идентификатор или счетчик повторных попыток.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Это решение гораздо более ручное, но оно дает мне контроль, который мне нужен для фактического модульного тестирования, без повторения и без потери области действия сельдерея.

Даниэль Дубовски
источник