У меня есть задача Celery с интенсивным использованием ЦП. Я хотел бы использовать всю вычислительную мощность (ядра) во множестве экземпляров EC2, чтобы выполнить эту работу быстрее ( я думаю, параллельная распределенная задача сельдерея с многопроцессорностью ) .
Я пытаюсь лучше понять термины, потоки , многопроцессорность , распределенные вычисления , распределенная параллельная обработка .
Пример задачи:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Используя приведенный выше код (с примером, если возможно), как бы можно было распределить эту задачу с помощью Celery, разрешив разделить эту задачу с использованием всей вычислительной мощности ЦП на всех доступных машинах в облаке?
python
django
multithreading
multiprocessing
celery
Прометей
источник
источник
Ответы:
Ваши цели:
Сельдерей может довольно легко сделать и то, и другое. Первое, что нужно понять, это то, что каждый работник сельдерея по умолчанию настроен на выполнение столько задач, сколько ядер ЦП доступно в системе:
Это означает, что каждой отдельной задаче не нужно беспокоиться об использовании многопроцессорности / потоковой передачи для использования нескольких процессоров / ядер. Вместо этого сельдерей будет выполнять достаточно задач одновременно, чтобы использовать каждый доступный процессор.
После этого следующим шагом будет создание задачи, которая обрабатывает некоторую часть вашего
list_of_millions_of_ids
. Здесь у вас есть несколько вариантов - один - чтобы каждая задача обрабатывала один идентификатор, поэтому вы запускаете N задач, гдеN == len(list_of_millions_of_ids)
. Это гарантирует, что работа будет равномерно распределена между всеми вашими задачами, поскольку никогда не будет случая, когда один работник закончит раньше и просто будет ждать; если ему нужна работа, он может вытащить идентификатор из очереди. Вы можете сделать это (как упоминал Джон Доу) с помощью сельдереяgroup
.tasks.py:
@app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save()
И для выполнения поставленных задач:
from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async()
Другой вариант - разбить список на более мелкие части и раздать их своим работникам. Такой подход сопряжен с риском потери некоторых циклов, потому что в итоге некоторые работники могут ждать, пока другие все еще работают. Однако в документации на сельдерей отмечается, что это опасение часто необоснованно:
Таким образом, вы можете обнаружить, что разбиение списка на части и их распределение по каждой задаче работает лучше из-за уменьшения накладных расходов на обмен сообщениями. Вероятно, вы также можете немного облегчить нагрузку на базу данных, вычислив каждый идентификатор, сохранив его в списке, а затем добавив весь список в БД, как только вы закончите, вместо того, чтобы делать это по одному идентификатору за раз . Подход к фрагментам будет выглядеть примерно так
tasks.py:
@app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
И для запуска задач:
from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async()
Вы можете немного поэкспериментировать с тем, какой размер фрагментов дает вам лучший результат. Вы хотите найти золотую середину, в которой вы сокращаете накладные расходы на обмен сообщениями, но при этом сохраняете достаточно маленький размер, чтобы работники не заканчивали свой кусок намного быстрее, чем другой работник, а затем просто ждал, ничего не делая.
источник
threading
илиmultiprocessing
. Вместо этого мы заставляем каждого работника сельдерея порождать столько задач, сколько ядер доступно на машине (это происходит по умолчанию в сельдерее). Это означает, что во всем кластере каждое ядро может использоваться для обработки вашихlist_of_million_ids
, поскольку каждая задача использует одно ядро. Таким образом, вместо одной задачи, использующей много ядер, у нас есть много задач, каждая из которых использует одно ядро. Имеет ли это смысл?threading
илиmultiprocessing
». Предполагая, что мы не можем разделить эту тяжелую задачу на несколько, как бы вы использовали потоки или многопроцессорность, чтобы сельдерей разделил задачу между несколькими экземплярами? спасибоmultiprocessing
разделить работу внутри самой задачи, поскольку оба подхода в конечном итоге требуют выполнения То же самое: разделение задачи на более мелкие задачи, которые можно запускать параллельно. На самом деле вы меняете только точку, в которой вы делаете расщепление.multiprocessing
внутри задачи Celery. Сам сельдерей используетbilliard
(multiprocessing
вилку) для запуска ваших задач в отдельных процессах. Вам просто не разрешено затем использовать ихmultiprocessing
внутри.В мире дистрибуции нужно помнить прежде всего только об одном:
Я знаю, это кажется очевидным, но перед тем, как распространять двойную проверку, вы используете лучший алгоритм (если он существует ...). При этом оптимизация распределения - это баланс между тремя вещами:
Компьютеры сделаны так, что чем ближе вы подходите к своему процессору (3), тем быстрее и эффективнее (1) и (2) будут. Порядок в классическом кластере будет следующим: сетевой жесткий диск, локальный жесткий диск, ОЗУ, внутренняя территория процессорного блока ... В настоящее время процессоры становятся достаточно сложными, чтобы их можно было рассматривать как совокупность независимых аппаратных процессоров, обычно называемых ядрами, эти ядра обрабатывают данные (3) через потоки (2). Представьте, что ваше ядро настолько быстрое, что при отправке данных одним потоком вы используете 50% мощности компьютера, а если ядро имеет 2 потока, вы будете использовать 100%. Два потока на ядро называются гиперпоточностью, и ваша ОС будет видеть 2 процессора на каждое гиперпоточное ядро.
Управление потоками в процессоре обычно называется многопоточностью. Управление процессорами из ОС обычно называется многопроцессорной обработкой. Управление параллельными задачами в кластере обычно называется параллельным программированием. Управление зависимыми задачами в кластере обычно называется распределенным программированием.
Так где же ваше узкое место?
А что насчет сельдерея?
Celery - это платформа обмена сообщениями для распределенного программирования, которая будет использовать модуль брокера для связи (2) и модуль backend для сохранения (1), это означает, что вы сможете, изменив конфигурацию, избежать большинства узких мест (если возможно) на ваша сеть и только в вашей сети. Сначала профилируйте свой код, чтобы добиться максимальной производительности на одном компьютере. Затем используйте сельдерей в своем кластере с конфигурацией по умолчанию и установите
CELERY_RESULT_PERSISTENT=True
:from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result
Во время выполнения откройте ваши любимые инструменты мониторинга, я использую значения по умолчанию для rabbitMQ и flower для сельдерея и top для процессора, ваши результаты будут сохранены в вашем бэкэнде. Примером узкого места в сети является рост очереди задач настолько, что они задерживают выполнение, вы можете перейти к изменению модулей или конфигурации сельдерея, если узкое место не находится где-то еще.
источник
Почему бы не использовать
group
для этого сельдерей?http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
По сути, вы должны разделить
ids
на части (или диапазоны) и передать их кучке задачgroup
.Для чего-то более сложного, например, для объединения результатов определенных задач с сельдереем, я успешно использовал
chord
задачу для аналогичной цели:http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
Увеличьте
settings.CELERYD_CONCURRENCY
до разумного числа, которое вы можете себе позволить, тогда эти работники сельдерея будут продолжать выполнять ваши задачи в группе или аккорде, пока не будут выполнены.Примечание: из-за ошибки в
kombu
прошлом были проблемы с повторным использованием воркеров для большого количества задач, я не знаю, исправлено ли это сейчас. Может быть, но если нет, уменьшите CELERYD_MAX_TASKS_PER_CHILD.Пример, основанный на упрощенном и модифицированном коде, который я запускаю:
@app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
получает результаты всехsingle_batch_processor
задач. Каждая задача выполняется на любом работнике Celery, этоkombu
координирует.Теперь я понимаю:
single_batch_processor
иsummarize
ТАКЖЕ должны быть задачи с сельдереем, а не обычные функции - иначе, конечно, они не будут распараллелены (я даже не уверен, что конструктор аккордов примет это, если это не задача сельдерея).источник
chord
(с CELERYD_CONCURRENCY, установленным на десятки рабочих == логические потоки процессора / оборудования) - это то, как я обрабатываю большое количество пакетов файлов журнала параллельно на нескольких ядрах.do_matches
будет заблокировано ожиданием аккорда. Это может привести к частичному или полному тупику, поскольку многие / все рабочие могут ждать подзадач, ни одна из которых не будет выполнена (поскольку рабочие ждут подзадач вместо того, чтобы усердно работать).Добавление большего количества рабочих сельдерея, безусловно, ускорит выполнение задачи. Однако у вас может быть еще одно узкое место: база данных. Убедитесь, что он может обрабатывать одновременные вставки / обновления.
Что касается вашего вопроса: вы добавляете работников сельдерея, назначая другому процессу в своих экземплярах EC2 как
celeryd
. В зависимости от того, сколько воркеров вам нужно, вы можете добавить еще больше экземпляров.источник