Отменить уже выполняющуюся задачу с помощью Celery?

96

Я читал документ и искал, но, похоже, не нашел прямого ответа:

Можете ли вы отменить уже выполняющуюся задачу? (так как задача запущена, занимает некоторое время, и ее нужно отменить на полпути)

Я нашел это из документа в FAQ по сельдерею

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

Но мне неясно, отменит ли это задачи в очереди или убьет запущенный процесс на рабочем месте. Спасибо за свет, который вы пролили!

dcoffey3296
источник

Ответы:

185

revoke отменяет выполнение задачи. Если задача отменена, рабочие игнорируют задачу и не выполняют ее. Если вы не используете постоянные отмены, ваша задача может быть выполнена после перезапуска воркера.

http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes

revoke имеет параметр завершения, который по умолчанию равен False . Если вам нужно убить выполняющуюся задачу, вам нужно установить для terminate значение True .

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

Мгер
источник
3
Это именно то объяснение, которое я искал, спасибо!
dcoffey3296
1
Это работает в распределенном окружении? Я имею в виду, если у меня есть рабочие на нескольких машинах, которые выполняют задачи. Следит ли сельдерей, на каком компьютере выполняется задача?
ksrini
1
Оно делает. Общение с работниками происходит через брокера.
mher
5
result.revoke (terminate = True) должен делать то же самое, что и revoke (task_id, terminate = True)
CamHart
10
Кроме того, согласно недавним документам Celery, использование параметра terminate - «последнее средство для администраторов». Вы рискуете завершить другую задачу, которая недавно была запущена для этого работника.
kouk
38

В Celery 3.1 изменен API отзыва задач .

Согласно FAQ по сельдерею , вы должны использовать result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

или если у вас есть только идентификатор задачи:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Рокаллит
источник
25

@ 0x00mh ответ правильный, однако недавние документы о сельдерее говорят, что использование этой terminateопции является « крайней мерой для администраторов », потому что вы можете случайно прервать другую задачу, которая начала выполняться в это время. Возможно, лучшее решение - объединение terminate=Trueс signal='SIGUSR1'(что вызывает исключение SoftTimeLimitExceeded в задаче).

коук
источник
2
Это решение мне очень понравилось. Когда SoftTimeLimitExceededвозникает в моей задаче, вызывается моя настраиваемая логика очистки (реализованная через try/ except/ finally). На мой взгляд, это намного лучше, чем то, что AbortableTaskпредлагает ( docs.celeryproject.org/en/latest/reference/… ). В последнем случае вам понадобится серверная часть результатов базы данных, и вам придется вручную и неоднократно проверять статус текущей задачи, чтобы увидеть, не было ли она прервана.
Дэвид Шнайдер
3
Насколько это лучше, насколько я понимаю, если есть какая-либо другая задача, подхваченная процессом, она все равно будет остановлена, просто будет выбрано другое исключение.
Марксин
Если я использую, worker_prefetch_multiplier = 1поскольку у меня всего несколько длительных задач, с завершением должно быть все в порядке - поскольку никакие другие задачи не будут выполнены путем завершения - правильно ли я понял? @spicyramen
maffe
1

Смотрите следующие параметры для задач: time_limit , soft_time_limit (или вы можете установить его для воркеров ). Если вы хотите , чтобы контролировать не только время выполнения, то см истекает аргумент apply_async метода.

простолизз
источник