Параллельно распределенная задача сельдерея с многопроцессорностью

80

У меня есть задача Celery с интенсивным использованием ЦП. Я хотел бы использовать всю вычислительную мощность (ядра) во множестве экземпляров EC2, чтобы выполнить эту работу быстрее ( я думаю, параллельная распределенная задача сельдерея с многопроцессорностью ) .

Я пытаюсь лучше понять термины, потоки , многопроцессорность , распределенные вычисления , распределенная параллельная обработка .

Пример задачи:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

Используя приведенный выше код (с примером, если возможно), как бы можно было распределить эту задачу с помощью Celery, разрешив разделить эту задачу с использованием всей вычислительной мощности ЦП на всех доступных машинах в облаке?

Прометей
источник
Я думал, что MapReduce был разработан для вашего типа приложений: console.aws.amazon.com/elasticmapreduce/vnext/… :
AStopher

Ответы:

120

Ваши цели:

  1. Распределите свою работу на многих машинах (распределенные вычисления / распределенная параллельная обработка)
  2. Распределить работу на данной машине по всем ЦП (многопроцессорность / многопоточность)

Сельдерей может довольно легко сделать и то, и другое. Первое, что нужно понять, это то, что каждый работник сельдерея по умолчанию настроен на выполнение столько задач, сколько ядер ЦП доступно в системе:

Параллелизм - это количество рабочих процессов prefork, используемых для одновременной обработки ваших задач, когда все они заняты выполнением работы, новые задачи должны будут дождаться завершения одной из задач, прежде чем она сможет быть обработана.

Номер параллелизма по умолчанию - это количество процессоров на этом компьютере (включая ядра) , вы можете указать собственный номер, используя параметр -c. Рекомендуемого значения нет, так как оптимальное количество зависит от ряда факторов, но если ваши задачи в основном связаны с вводом-выводом, вы можете попытаться увеличить его, эксперименты показали, что добавление более чем в два раза количества ЦП редко бывает эффективно и, скорее всего, снизит производительность.

Это означает, что каждой отдельной задаче не нужно беспокоиться об использовании многопроцессорности / потоковой передачи для использования нескольких процессоров / ядер. Вместо этого сельдерей будет выполнять достаточно задач одновременно, чтобы использовать каждый доступный процессор.

После этого следующим шагом будет создание задачи, которая обрабатывает некоторую часть вашего list_of_millions_of_ids. Здесь у вас есть несколько вариантов - один - чтобы каждая задача обрабатывала один идентификатор, поэтому вы запускаете N задач, где N == len(list_of_millions_of_ids). Это гарантирует, что работа будет равномерно распределена между всеми вашими задачами, поскольку никогда не будет случая, когда один работник закончит раньше и просто будет ждать; если ему нужна работа, он может вытащить идентификатор из очереди. Вы можете сделать это (как упоминал Джон Доу) с помощью сельдерея group.

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

И для выполнения поставленных задач:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Другой вариант - разбить список на более мелкие части и раздать их своим работникам. Такой подход сопряжен с риском потери некоторых циклов, потому что в итоге некоторые работники могут ждать, пока другие все еще работают. Однако в документации на сельдерей отмечается, что это опасение часто необоснованно:

Некоторые могут беспокоиться, что разбиение ваших задач на части приведет к ухудшению параллелизма, но это редко бывает верно для загруженного кластера и на практике, поскольку вы избегаете накладных расходов на обмен сообщениями, это может значительно повысить производительность.

Таким образом, вы можете обнаружить, что разбиение списка на части и их распределение по каждой задаче работает лучше из-за уменьшения накладных расходов на обмен сообщениями. Вероятно, вы также можете немного облегчить нагрузку на базу данных, вычислив каждый идентификатор, сохранив его в списке, а затем добавив весь список в БД, как только вы закончите, вместо того, чтобы делать это по одному идентификатору за раз . Подход к фрагментам будет выглядеть примерно так

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

И для запуска задач:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

Вы можете немного поэкспериментировать с тем, какой размер фрагментов дает вам лучший результат. Вы хотите найти золотую середину, в которой вы сокращаете накладные расходы на обмен сообщениями, но при этом сохраняете достаточно маленький размер, чтобы работники не заканчивали свой кусок намного быстрее, чем другой работник, а затем просто ждал, ничего не делая.

дано
источник
Таким образом, часть, в которой я выполняю «сложную задачу с тяжелым процессором (возможно, 3D-рендеринг)», будет автоматически распределяться с параллельной обработкой, т.е. одна задача будет использовать столько вычислительной мощности, сколько доступно для всех экземпляров --- и все это вне -коробка? действительно? Вау. PS хороший ответ, спасибо, что объяснили мне это лучше.
Прометей
3
@Spike Не совсем так. Задачи, написанные в настоящее время, могут использовать только одно ядро. Чтобы отдельная задача использовала более одного ядра, мы должны ввести threadingили multiprocessing. Вместо этого мы заставляем каждого работника сельдерея порождать столько задач, сколько ядер доступно на машине (это происходит по умолчанию в сельдерее). Это означает, что во всем кластере каждое ядро ​​может использоваться для обработки ваших list_of_million_ids, поскольку каждая задача использует одно ядро. Таким образом, вместо одной задачи, использующей много ядер, у нас есть много задач, каждая из которых использует одно ядро. Имеет ли это смысл?
dano 03
1
«Чтобы отдельная задача использовала более одного ядра, мы должны ввести threadingили multiprocessing». Предполагая, что мы не можем разделить эту тяжелую задачу на несколько, как бы вы использовали потоки или многопроцессорность, чтобы сельдерей разделил задачу между несколькими экземплярами? спасибо
Тристан
@Tristan Это зависит от того, что на самом деле делает задача. Однако в большинстве случаев я бы сказал, что если вы не можете разделить саму задачу на подзадачи, вам, вероятно, будет сложно multiprocessingразделить работу внутри самой задачи, поскольку оба подхода в конечном итоге требуют выполнения То же самое: разделение задачи на более мелкие задачи, которые можно запускать параллельно. На самом деле вы меняете только точку, в которой вы делаете расщепление.
dano 03
1
@PirateApp Эта проблема говорит о том, что вы не можете использовать multiprocessing внутри задачи Celery. Сам сельдерей использует billiard( multiprocessingвилку) для запуска ваших задач в отдельных процессах. Вам просто не разрешено затем использовать их multiprocessingвнутри.
dano 01
12

В мире дистрибуции нужно помнить прежде всего только об одном:

Преждевременная оптимизация - это корень всех зол. Автор: Д. Кнут

Я знаю, это кажется очевидным, но перед тем, как распространять двойную проверку, вы используете лучший алгоритм (если он существует ...). При этом оптимизация распределения - это баланс между тремя вещами:

  1. Запись / чтение данных с постоянного носителя,
  2. Перенос данных с носителя A на носитель B,
  3. Обработка данных,

Компьютеры сделаны так, что чем ближе вы подходите к своему процессору (3), тем быстрее и эффективнее (1) и (2) будут. Порядок в классическом кластере будет следующим: сетевой жесткий диск, локальный жесткий диск, ОЗУ, внутренняя территория процессорного блока ... В настоящее время процессоры становятся достаточно сложными, чтобы их можно было рассматривать как совокупность независимых аппаратных процессоров, обычно называемых ядрами, эти ядра обрабатывают данные (3) через потоки (2). Представьте, что ваше ядро ​​настолько быстрое, что при отправке данных одним потоком вы используете 50% мощности компьютера, а если ядро ​​имеет 2 потока, вы будете использовать 100%. Два потока на ядро ​​называются гиперпоточностью, и ваша ОС будет видеть 2 процессора на каждое гиперпоточное ядро.

Управление потоками в процессоре обычно называется многопоточностью. Управление процессорами из ОС обычно называется многопроцессорной обработкой. Управление параллельными задачами в кластере обычно называется параллельным программированием. Управление зависимыми задачами в кластере обычно называется распределенным программированием.

Так где же ваше узкое место?

  • В (1): попробуйте продолжить и передавать поток с верхнего уровня (тот, который ближе к вашему процессору, например, если сетевой жесткий диск медленный, сначала сохраните на локальном жестком диске)
  • В (2): это наиболее распространенный вариант, старайтесь избегать пакетов связи, которые не нужны для распространения, или сжимайте пакеты «на лету» (например, если HD медленный, сохраните только сообщение «пакетно вычисленное» и сохраните промежуточные результаты в ОЗУ).
  • В (3): Готово! Вы используете всю имеющуюся в вашем распоряжении вычислительную мощность.

А что насчет сельдерея?

Celery - это платформа обмена сообщениями для распределенного программирования, которая будет использовать модуль брокера для связи (2) и модуль backend для сохранения (1), это означает, что вы сможете, изменив конфигурацию, избежать большинства узких мест (если возможно) на ваша сеть и только в вашей сети. Сначала профилируйте свой код, чтобы добиться максимальной производительности на одном компьютере. Затем используйте сельдерей в своем кластере с конфигурацией по умолчанию и установите CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

Во время выполнения откройте ваши любимые инструменты мониторинга, я использую значения по умолчанию для rabbitMQ и flower для сельдерея и top для процессора, ваши результаты будут сохранены в вашем бэкэнде. Примером узкого места в сети является рост очереди задач настолько, что они задерживают выполнение, вы можете перейти к изменению модулей или конфигурации сельдерея, если узкое место не находится где-то еще.

тк.
источник
9

Почему бы не использовать groupдля этого сельдерей?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

По сути, вы должны разделить idsна части (или диапазоны) и передать их кучке задач group.

Для чего-то более сложного, например, для объединения результатов определенных задач с сельдереем, я успешно использовал chordзадачу для аналогичной цели:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Увеличьте settings.CELERYD_CONCURRENCYдо разумного числа, которое вы можете себе позволить, тогда эти работники сельдерея будут продолжать выполнять ваши задачи в группе или аккорде, пока не будут выполнены.

Примечание: из-за ошибки в kombuпрошлом были проблемы с повторным использованием воркеров для большого количества задач, я не знаю, исправлено ли это сейчас. Может быть, но если нет, уменьшите CELERYD_MAX_TASKS_PER_CHILD.

Пример, основанный на упрощенном и модифицированном коде, который я запускаю:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarizeполучает результаты всех single_batch_processorзадач. Каждая задача выполняется на любом работнике Celery, это kombuкоординирует.

Теперь я понимаю: single_batch_processorи summarizeТАКЖЕ должны быть задачи с сельдереем, а не обычные функции - иначе, конечно, они не будут распараллелены (я даже не уверен, что конструктор аккордов примет это, если это не задача сельдерея).

LetMeSOThat4U
источник
Насколько я понимаю, это разделит задачу, но не использует параллельную распределенную задачу сельдерея с многопроцессорной обработкой. т.е. просто использовать всю свободную мощность ЦП на всех облачных машинах.
Прометей
Я не уверен, почему это могло произойти - Celery работает так, как будто у вас есть группа рабочих, независимо от того, где они расположены, они могут даже быть расположены на другой машине. Конечно, вам нужно иметь более одного рабочего. chord(с CELERYD_CONCURRENCY, установленным на десятки рабочих == логические потоки процессора / оборудования) - это то, как я обрабатываю большое количество пакетов файлов журнала параллельно на нескольких ядрах.
LetMeSOThat4U
Это ДЕЙСТВИТЕЛЬНО ПЛОХОЙ пример кода. Задание do_matchesбудет заблокировано ожиданием аккорда. Это может привести к частичному или полному тупику, поскольку многие / все рабочие могут ждать подзадач, ни одна из которых не будет выполнена (поскольку рабочие ждут подзадач вместо того, чтобы усердно работать).
Prisacari Dmitrii
@PrisacariDmitrii Так какое же тогда решение было бы правильным?
LetMeSOThat4U
4

Добавление большего количества рабочих сельдерея, безусловно, ускорит выполнение задачи. Однако у вас может быть еще одно узкое место: база данных. Убедитесь, что он может обрабатывать одновременные вставки / обновления.

Что касается вашего вопроса: вы добавляете работников сельдерея, назначая другому процессу в своих экземплярах EC2 как celeryd. В зависимости от того, сколько воркеров вам нужно, вы можете добавить еще больше экземпляров.

Торстен Энгельбрехт
источник
> Добавление большего количества рабочих сельдерея, безусловно, ускорит выполнение задачи. --- Имеет ли это? То есть вы говорите, что сельдерей распределит эту задачу между всеми моими экземплярами, и мне не придется ее разрезать?
Прометей
Подожди секунду. Я просто прочитал ваш код еще раз, и поскольку это всего одна задача, это не поможет. Вы можете запускать одну задачу для каждого идентификатора (или фрагментов идентификаторов). Или вы следуете совету Джона Доу в другом ответе. Тогда вы сможете получить прибыль от количества рабочих сельдерея. И да, в этом случае много не нужно делать. Просто убедитесь, что рабочие используют одни и те же очереди.
Торстен Энгельбрехт