Как проверить, выполняется ли задача в сельдерее (в частности, я использую celery-django)?
Я прочитал документацию и погуглил, но не вижу вызова вроде:
my_example_task.state() == RUNNING
Мой вариант использования заключается в том, что у меня есть внешняя (java) служба для перекодирования. Когда я отправляю документ для перекодировки, я хочу проверить, запущена ли задача, которая запускает эту службу, и, если нет, запустить ее (повторно).
Я использую текущие стабильные версии - кажется, 2.4.
x
?async_result
. В вашем случае использования у вас уже есть экземпляр, все готово. Но что произойдет, если у вас есть только идентификатор задачи и вам нужно создатьasync_result
экземпляр, чтобы иметь возможность вызыватьasync_result.get()
? Это экземплярAsyncResult
класса, но вы не можете использовать необработанный классcelery.result.AsyncResult
, вам нужно получить класс из функции, обернутойapp.task()
. В вашем случае вы бы сделалиasync_result = run_instance.AsyncResult('task-id')
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task().
- Думаю, так оно и должно было быть на самом деле. Прочтите код: github.com/celery/celery/blob/…Создание
AsyncResult
объекта из идентификатора задачи - это способ, рекомендованный в FAQ для получения статуса задачи, когда единственное, что у вас есть, - это идентификатор задачи.Однако, начиная с Celery 3.x, есть серьезные предостережения, которые могут укусить людей, если они не обратят на них внимание. Это действительно зависит от конкретного сценария использования.
По умолчанию Celery не записывает «запущенное» состояние.
Чтобы Celery записал, что задача выполняется, необходимо установить
task_track_started
значениеTrue
. Вот простая задача, которая проверяет это:@app.task(bind=True) def test(self): print self.AsyncResult(self.request.id).state
Когда
task_track_started
естьFalse
(значение по умолчанию), состояние отображается,PENDING
даже если задача запущена. Если вы установитеtask_track_started
наTrue
, то состояние будетSTARTED
.Состояние
PENDING
означает «я не знаю».С
AsyncResult
состояниемPENDING
не означает ничего, кроме того, что Celery не знает статус задачи. Это могло быть по любому количеству причин.Во-первых,
AsyncResult
могут быть созданы с недопустимыми идентификаторами задач. Такие «задачи» будут считаться отложенными для Celery:>>> task.AsyncResult("invalid").status 'PENDING'
Итак, никто не будет скармливать явно недействительные идентификаторы
AsyncResult
. Достаточно справедливо, но он также имеет эффект,AsyncResult
который также учитывает задачу, которая успешно выполнена, но о которой Celery забылPENDING
. Опять же, в некоторых сценариях использования это может быть проблемой. Частично проблема зависит от того, как Celery настроен для хранения результатов задач, потому что это зависит от доступности «надгробий» в серверной части результатов. («Надгробия» - это термин, который используется в документации по сельдерею для фрагментов данных, которые фиксируют, как закончилась задача.) ИспользованиеAsyncResult
вообще не будет работать, если оноtask_ignore_result
естьTrue
. Более неприятная проблема заключается в том, что Celery по умолчанию истекает срок действия надгробий. Вresult_expires
настройка по умолчанию установлена на 24 часа. Итак, если вы запускаете задачу и записываете идентификатор в долгосрочное хранилище, и более 24 часов спустя вы создаетеAsyncResult
с ним задачу, статус будетPENDING
.Все «настоящие задачи» запускаются в
PENDING
штате. Таким образом,PENDING
выполнение задачи может означать, что задача была запрошена, но никогда не продвинулась дальше (по какой-либо причине). Или это может означать, что задача выполнена, но Celery забыл о своем состоянии.Ой!
AsyncResult
у меня не сработает. Что еще я могу сделать?Я предпочитаю отслеживать цели, а не сами задачи . Я храню некоторую информацию о задачах, но это второстепенно по отношению к отслеживанию целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависит от достигнутой цели, он проверяет, была ли цель уже достигнута, если да, то он использует эту кешированную цель, в противном случае запускает задачу, которая повлияет на цель, и отправляет ее в клиент, сделавший HTTP-запрос, является ответом, который указывает, что он должен дождаться результата.
Имена переменных и гиперссылки выше предназначены для Celery 4.x. В 3.x соответствующие переменные и гиперссылки:
CELERY_TRACK_STARTED
,CELERY_IGNORE_RESULT
,CELERY_TASK_RESULT_EXPIRES
.источник
Каждый
Task
объект имеет.request
свойство, которое содержит егоAsyncRequest
объект. Соответственно, следующая строка показывает состояние Задачиtask
:источник
state
свойства, она все равно вернуласьPENDING
. Это не кажется надежным способом отслеживать состояние задач сельдерея с терминала. Кроме того, у меня работает Celery Flower (инструмент мониторинга сельдерея), по какой-то причине он не отображал задачи, которые я искал, в списке задач, которые он выполнил. Возможно, мне придется заглянуть в настройки цветов, чтобы увидеть, есть ли что-нибудь, что говорит показывать только до определенных часов в прошлом.Вы также можете создавать собственные состояния и обновлять выполнение задач, связанных с выполнением задач. Этот пример взят из документации:
@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
источник
Старый вопрос, но недавно я столкнулся с этой проблемой.
Если вы пытаетесь получить task_id, вы можете сделать это так:
import celery from celery_app import add from celery import uuid task_id = uuid() result = add.apply_async((2, 2), task_id=task_id)
Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult:
# grab the AsyncResult result = celery.result.AsyncResult(task_id) # print the task id print result.task_id 09dad9cf-c9fa-4aee-933f-ff54dae39bdf # print the AsyncResult's status print result.status SUCCESS # print the result returned print result.result 4
источник
apply_async
. Возвращаемый объектapply_async
- этоAsyncResult
объект, у которого есть идентификатор задачи, созданной Celery.task_id
требования, чтобы вы сами сгенерировали идентификатор задачи. В своем комментарии вы представили причину, которая выходит за рамки «как мне проверить статус задачи» и «Если вы пытаетесь получить task_id ...` Отлично, если у вас есть такая потребность, но это не так здесь. (Кроме того, использованиеuuid()
для генерации идентификатора задачи не делает абсолютно ничего, кроме того, что делаетПросто используйте этот API из FAQ по сельдерею
Это прекрасно работает.
источник
Ответ 2020 года:
#### tasks.py @celery.task() def mytask(arg1): print(arg1) #### blueprint.py @bp.route("/args/arg1=<arg1>") def sleeper(arg1): process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1) state = process.state return f"Thanks for your patience, your job {process.task_id} \ is being processed. Status {state}"
источник
Пытаться:
task.AsyncResult(task.request.id).state
это предоставит статус задачи Celery. Если задача Celery уже находится в состоянии FAILURE, она выдаст исключение:
raised unexpected: KeyError('exc_type',)
источник
для простых задач мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.
а для сложных задач, скажем, задача, связанная с множеством других модулей. Мы рекомендуем вручную записывать ход выполнения и сообщения по конкретной задаче.
источник
Я нашел полезную информацию в
Руководство для работников проекта сельдерея, инспектирующие рабочих
В моем случае я проверяю, работает ли Celery.
inspect_workers = task.app.control.inspect() if inspect_workers.registered() is None: state = 'FAILURE' else: state = str(task.state)
Вы можете поиграть с inspect, чтобы понять свои потребности.
источник
vi my_celery_apps / app1.py
vi задачи / task1.py
from my_celery_apps.app1 import app app.AsyncResult(taskid) try: if task.state.lower() != "success": return except: """ do something """
источник
Помимо вышеперечисленного Программный подход Использование цветочного статуса задачи можно легко увидеть.
Мониторинг в реальном времени с использованием Celery Events. Flower - это веб-инструмент для мониторинга и управления кластерами сельдерея.
Официальный документ: Цветок - инструмент для мониторинга сельдерея
Установка:
Применение:
http://localhost:5555
источник
res = method.delay() print(f"id={res.id}, state={res.state}, status={res.status} ") print(res.get())
источник