from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Я пробовал это, но это очень медленно (например, 1 сек). Я использую его синхронно в приложении торнадо для мониторинга прогресса, поэтому он должен быть быстрым.
JulienFr
41
Это не вернет список задач в очереди, которые еще должны быть обработаны.
Эд Джей
9
Используйте i.reserved()для получения списка задач в очереди.
Банан
4
Кто-нибудь испытывал, что у i.reserved () не будет точного списка активных задач? У меня есть задачи, которые не отображаются в списке. Я на django-сельдерее == 3.1.10
Сеперман
6
При указании работника я должен был использовать список в качестве аргумента: inspect(['celery@Flatty']). Огромное улучшение скорости закончилось inspect().
Adversus
42
если вы используете rabbitMQ, используйте это в терминале:
sudo rabbitmqctl list_queues
он напечатает список очередей с количеством ожидающих задач. например:
Я знаком с этим, когда у меня есть привилегии sudo, но я хочу, чтобы непривилегированный системный пользователь мог проверить - какие-либо предложения?
мудрец
Кроме того, вы можете передать это, grep -e "^celery\s" | cut -f2чтобы извлечь это, 166если вы хотите обработать это число позже, скажем, для статистики.
Jamesc
22
Если вы не используете приоритетные задачи, это на самом деле довольно просто, если вы используете Redis. Чтобы получить значение задачи:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Но приоритетные задачи используют другой ключ в redis , поэтому полная картина немного сложнее. Полная картина такова, что вам нужно запросить redis для каждого приоритета задачи. В python (и из проекта Flower) это выглядит так:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Если вы хотите получить реальную задачу, вы можете использовать что-то вроде:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
Оттуда вам придется десериализовать возвращенный список. В моем случае я смог сделать это с помощью чего-то вроде:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Просто имейте в виду, что десериализация может занять некоторое время, и вам нужно настроить приведенные выше команды для работы с различными приоритетами.
Я обновил выше для обработки приоритетных задач. Прогресс!
mlissner
1
Просто, чтобы разобраться, DATABASE_NUMBERпо умолчанию используется is 0и QUEUE_NAMEis celery, поэтому redis-cli -n 0 llen celeryбудет возвращаться количество сообщений в очереди.
Vineet Bansal
Для моего сельдерея название очереди '{{{0}}}{1}{2}'вместо '{0}{1}{2}'. Кроме этого, это работает отлично!
Если вы используете Celery + Django, самый простой способ проверять задачи, используя команды непосредственно из вашего терминала в вашей виртуальной среде или используя полный путь к сельдерею:
Если у вас есть определенный проект, вы можете использоватьcelery -A my_proj inspect reserved
sashaboulouds
6
Решение для копирования и вставки Redis с сериализацией json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Это работает с Джанго. Просто не забудьте поменять yourproject.celery.
Если вы используете сериализатор рассола, вы можете изменить body =строку на body = pickle.loads(base64.b64decode(j['body'])).
Джим Хунзикер
4
Модуль проверки сельдерея, кажется, знает только о задачах с точки зрения рабочих. Если вы хотите просмотреть сообщения, находящиеся в очереди (которые еще не были извлечены рабочими), я предлагаю использовать пирабит , который может взаимодействовать с rabbitmq http api для получения всех видов информации из очереди.
Я думаю, что единственный способ получить задачи, которые ждут, - это сохранить список задач, которые вы запустили, и позволить этой задаче удалить себя из списка при запуске.
Если то, что вы хотите, включает в себя задачу, которая обрабатывается, но еще не завершена, вы можете сохранить список ваших задач и проверить их состояния:
from tasks import add
result = add.delay(4,4)
result.ready()# True if finished
Или вы позволяете Celery сохранять результаты с помощью CELERY_RESULT_BACKEND и проверять, каких ваших задач там нет.
@ daveoncode Не думаю, что для меня достаточно информации, чтобы помочь. Вы можете открыть свой вопрос. Я не думаю, что это будет дубликат этого, если вы укажете, что хотите получить информацию в python. Я бы вернулся к stackoverflow.com/a/19465670/9843399 , на котором я основал свой ответ, и убедился, что он работает первым.
Калеб Сиринг
@CalebSyring Это первый подход, который действительно показывает мне поставленные в очередь задачи. Очень хорошо. Единственная проблема для меня заключается в том, что добавление в список не работает. Любые идеи, как я могу сделать функцию обратного вызова записать в список?
Варлор
@ Извините, кто-то неправильно отредактировал мой ответ. Вы можете посмотреть в истории изменений оригинальный ответ, который, скорее всего, подойдет вам. Я работаю над тем, чтобы исправить это. (РЕДАКТИРОВАТЬ: Я просто вошел и отклонил редактирование, в котором была очевидная ошибка Python. Дайте мне знать, если это решило вашу проблему или нет.)
Калеб Syring
@CalebSyring Я теперь использовал ваш код в классе, имея список в качестве атрибута класса!
Варлор
2
Насколько я знаю, Celery не предоставляет API для проверки задач, ожидающих в очереди. Это зависит от брокера. Если вы используете Redis в качестве посредника для примера, то изучение задач, ожидающих в celeryочереди (по умолчанию), так же просто, как:
подключиться к базе данных брокера
элементы списка в celeryсписке (например, команда LRANGE)
Имейте в виду, что это задачи, ожидающие выбора доступных рабочих. В вашем кластере могут быть запущены некоторые задачи - их не будет в этом списке, поскольку они уже выбраны.
Я пришел к выводу, что лучший способ получить количество заданий в очереди - использовать, rabbitmqctlкак было предложено здесь несколько раз. Чтобы разрешить любому выбранному пользователю запускать команду, sudoя следовал приведенным здесь инструкциям (я пропустил редактирование части профиля, так как не против набрать sudo перед командой).
Я также взял jamesc's grepи cutсниппет и завернул их в вызовы подпроцесса.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Если вы управляете кодом задач, вы можете обойти проблему, позволяя задаче выполнить тривиальную повторную попытку при первом запуске, а затем выполнить проверку inspect().reserved(). Повторная попытка регистрирует задачу с помощью обработчика результатов, и сельдерей может это увидеть. Задача должна принять selfили в contextкачестве первого параметра, чтобы мы могли получить доступ к количеству повторов.
Это решение не зависит от брокера, т.е. Вам не нужно беспокоиться о том, используете ли вы RabbitMQ или Redis для хранения задач.
РЕДАКТИРОВАТЬ: после тестирования я обнаружил, что это только частичное решение. Размер зарезервированного ограничен настройкой предварительной выборки для работника.
Ответы:
РЕДАКТИРОВАТЬ: см. Другие ответы для получения списка задач в очереди.
Вы должны посмотреть здесь: Сельдерей Руководство - осмотр рабочих
В основном это:
В зависимости от того, что вы хотите
источник
i.reserved()
для получения списка задач в очереди.inspect(['celery@Flatty'])
. Огромное улучшение скорости закончилосьinspect()
.если вы используете rabbitMQ, используйте это в терминале:
он напечатает список очередей с количеством ожидающих задач. например:
число в правом столбце - это количество задач в очереди. выше, у очереди сельдерея есть 166 отложенных задач.
источник
grep -e "^celery\s" | cut -f2
чтобы извлечь это,166
если вы хотите обработать это число позже, скажем, для статистики.Если вы не используете приоритетные задачи, это на самом деле довольно просто, если вы используете Redis. Чтобы получить значение задачи:
Но приоритетные задачи используют другой ключ в redis , поэтому полная картина немного сложнее. Полная картина такова, что вам нужно запросить redis для каждого приоритета задачи. В python (и из проекта Flower) это выглядит так:
Если вы хотите получить реальную задачу, вы можете использовать что-то вроде:
Оттуда вам придется десериализовать возвращенный список. В моем случае я смог сделать это с помощью чего-то вроде:
Просто имейте в виду, что десериализация может занять некоторое время, и вам нужно настроить приведенные выше команды для работы с различными приоритетами.
источник
DATABASE_NUMBER
по умолчанию используется is0
иQUEUE_NAME
iscelery
, поэтомуredis-cli -n 0 llen celery
будет возвращаться количество сообщений в очереди.'{{{0}}}{1}{2}'
вместо'{0}{1}{2}'
. Кроме этого, это работает отлично!Чтобы получить задачи из бэкэнда, используйте эту
источник
Если вы используете Celery + Django, самый простой способ проверять задачи, используя команды непосредственно из вашего терминала в вашей виртуальной среде или используя полный путь к сельдерею:
Документ : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Также, если вы используете Celery + RabbitMQ, вы можете проверить список очередей, используя следующую команду:
Больше информации : https://linux.die.net/man/1/rabbitmqctl
источник
celery -A my_proj inspect reserved
Решение для копирования и вставки Redis с сериализацией json:
Это работает с Джанго. Просто не забудьте поменять
yourproject.celery
.источник
body =
строку наbody = pickle.loads(base64.b64decode(j['body']))
.Модуль проверки сельдерея, кажется, знает только о задачах с точки зрения рабочих. Если вы хотите просмотреть сообщения, находящиеся в очереди (которые еще не были извлечены рабочими), я предлагаю использовать пирабит , который может взаимодействовать с rabbitmq http api для получения всех видов информации из очереди.
Пример можно найти здесь: Получить длину очереди с помощью сельдерея (RabbitMQ, Django)
источник
Я думаю, что единственный способ получить задачи, которые ждут, - это сохранить список задач, которые вы запустили, и позволить этой задаче удалить себя из списка при запуске.
С rabbitmqctl и list_queues вы можете получить общее представление о количестве ожидающих задач, но не самих задач: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Если то, что вы хотите, включает в себя задачу, которая обрабатывается, но еще не завершена, вы можете сохранить список ваших задач и проверить их состояния:
Или вы позволяете Celery сохранять результаты с помощью CELERY_RESULT_BACKEND и проверять, каких ваших задач там нет.
источник
Это работает для меня в моем приложении:
active_jobs
будет список строк, которые соответствуют задачам в очереди.Не забудьте поменять CELERY_APP_INSTANCE со своим собственным.
Спасибо @ashish за то, что он указал мне правильное направление со своим ответом здесь: https://stackoverflow.com/a/19465670/9843399
источник
jobs
всегда ноль ... есть идеи?Насколько я знаю, Celery не предоставляет API для проверки задач, ожидающих в очереди. Это зависит от брокера. Если вы используете Redis в качестве посредника для примера, то изучение задач, ожидающих в
celery
очереди (по умолчанию), так же просто, как:celery
списке (например, команда LRANGE)Имейте в виду, что это задачи, ожидающие выбора доступных рабочих. В вашем кластере могут быть запущены некоторые задачи - их не будет в этом списке, поскольку они уже выбраны.
источник
Я пришел к выводу, что лучший способ получить количество заданий в очереди - использовать,
rabbitmqctl
как было предложено здесь несколько раз. Чтобы разрешить любому выбранному пользователю запускать команду,sudo
я следовал приведенным здесь инструкциям (я пропустил редактирование части профиля, так как не против набрать sudo перед командой).Я также взял jamesc's
grep
иcut
сниппет и завернул их в вызовы подпроцесса.источник
источник
Если вы управляете кодом задач, вы можете обойти проблему, позволяя задаче выполнить тривиальную повторную попытку при первом запуске, а затем выполнить проверку
inspect().reserved()
. Повторная попытка регистрирует задачу с помощью обработчика результатов, и сельдерей может это увидеть. Задача должна принятьself
или вcontext
качестве первого параметра, чтобы мы могли получить доступ к количеству повторов.Это решение не зависит от брокера, т.е. Вам не нужно беспокоиться о том, используете ли вы RabbitMQ или Redis для хранения задач.
РЕДАКТИРОВАТЬ: после тестирования я обнаружил, что это только частичное решение. Размер зарезервированного ограничен настройкой предварительной выборки для работника.
источник
С
subprocess.run
:Будьте осторожны, чтобы измениться
my_proj
сyour_proj
источник