Многопроцессорность Python: понимание логики `chunksize`

84

Какие факторы определяют оптимальный chunksizeаргумент в пользу таких методов multiprocessing.Pool.map()? .map()Похоже, что этот метод использует произвольную эвристику для размера фрагмента по умолчанию (поясняется ниже); что мотивирует этот выбор и существует ли более продуманный подход, основанный на какой-либо конкретной ситуации / настройке?

Пример - скажи, что я:

  • Пропускание iterableна .map()что ~ 15 миллионов элементов;
  • Работаем на машине с 24 ядрами и используем по умолчанию processes = os.cpu_count()внутри multiprocessing.Pool().

Я наивно считаю, что каждому из 24 рабочих нужно дать кусок равного размера, то есть 15_000_000 / 24625 000 человек. Большие блоки должны снизить текучесть / накладные расходы при полном использовании всех рабочих. Но похоже, что здесь отсутствуют некоторые потенциальные недостатки предоставления больших партий каждому рабочему. Это неполное изображение, и что мне не хватает?


Часть моего вопроса проистекает из логики по умолчанию для if chunksize=None: both .map()и .starmap()call .map_async(), которая выглядит следующим образом:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

В чем логика divmod(len(iterable), len(self._pool) * 4)? Это означает, что размер фрагмента будет ближе к 15_000_000 / (24 * 4) == 156_250. Каково намерение умножения len(self._pool)на 4?

Это делает результирующий размер фрагмента в 4 раза меньше, чем моя "наивная логика" сверху, которая состоит из простого деления длины итерации на количество рабочих вpool._pool .

Наконец, есть еще этот фрагмент из документации Python, .imap()который еще больше подогревает мое любопытство:

chunksizeАргумент является таким же , как тот , используемым map() методом. Для очень длинных итераций использование большого значения для chunksizeможет значительно ускорить выполнение задания, чем использование значения по умолчанию 1.


Связанный ответ, который полезен, но слишком высокоуровневый: Многопроцессорность Python: почему большие куски медленнее? .

Брэд Соломон
источник
1
4Произвольно, и весь расчет chunksize является эвристическим. Важным фактором является то, насколько может варьироваться ваше фактическое время обработки. Чуть подробнее об этом здесь , пока я не было времени для ответа , если еще нужен тогда.
Darkonaut
Вы проверяли этот вопрос ?
Эндрю Нагиб
1
Спасибо @AndrewNaguib, я вообще-то не наткнулся на это как-то
Брэд Соломон
1
Просто чтобы вы знали: я не забыл этот вопрос. Фактически, я работаю над каноническим ответом библейского измерения (множество полезных фрагментов кода и причудливой графики) с того дня, как вы спросили. Награда по-прежнему пришла на 1-2 недели раньше, чтобы все было завершено, но я уверен, что смогу выпустить что-то достаточно близко до крайнего срока.
Darkonaut
@BradSolomon Добро пожаловать :). Это отвечает на ваш вопрос?
Эндрю Нагиб

Ответы:

194

Краткий ответ

Chunksize-алгоритм пула является эвристическим. Он обеспечивает простое решение для всех возможных сценариев проблем, которые вы пытаетесь внедрить в методы Pool. Как следствие, его нельзя оптимизировать для какого-либо конкретного сценария.

Алгоритм произвольно делит итерацию примерно на четыре части больше, чем при наивном подходе. Больше фрагментов означает больше накладных расходов, но увеличивает гибкость планирования. Как будет видно из этого ответа, в среднем это приводит к более высокому использованию рабочих, но без гарантии более короткого общего времени вычислений для каждого случая.

«Приятно знать, - можете подумать вы, - но как эта информация поможет мне в решении конкретных проблем, связанных с многопроцессорностью?» Что ж, это не так. Более честный короткий ответ: «короткого ответа нет», «многопроцессорность - это сложно» и «это зависит от обстоятельств». Наблюдаемый симптом может иметь разные корни даже при одинаковых сценариях.

Этот ответ пытается предоставить вам основные концепции, которые помогут вам получить более четкое представление о черном ящике планирования Pool. Он также пытается дать вам несколько основных инструментов для распознавания и предотвращения потенциальных обрывов, поскольку они связаны с размером фрагментов.


Содержание

Часть I

  1. Определения
  2. Цели распараллеливания
  3. Сценарии распараллеливания
  4. Риски Chunksize> 1
  5. Алгоритм размера фрагментов Пула
  6. Количественная оценка эффективности алгоритма

    6.1 Модели

    6.2 Параллельное расписание

    6.3 Эффективность

    6.3.1 Абсолютная эффективность распределения (ADE)

    6.3.2 Относительная эффективность распределения (RDE)

Часть II

  1. Наивный алгоритм против алгоритма пула
  2. Проверка в реальных условиях
  3. Заключение

Сначала необходимо уточнить некоторые важные термины.


1. Определения


Кусок

Здесь фрагмент - это доля iterableаргумента, указанного в вызове метода пула. Как рассчитывается размер фрагмента и какие эффекты это может иметь, является темой этого ответа.


Задача

Физическое представление задачи в рабочем процессе в терминах данных можно увидеть на рисунке ниже.

фигура0

На рисунке показан пример вызова pool.map(), отображаемого вдоль строки кода, взятого из multiprocessing.pool.workerфункции, где задача, считанная из функции, inqueueраспаковывается. workerявляется основной функцией MainThreadрабочего процесса пула. func-Argument указан в бассейне-методе будет только совпадать с func-переменными внутри worker-функцией для методов одного вызова , как apply_asyncи для imapс chunksize=1. Для остальных методов пула с chunksizeпараметром-функцией обработки funcбудет функция- преобразователь ( mapstarили starmapstar). Эта функция отображает указанный пользователем funcпараметр на каждый элемент переданного фрагмента итерируемого объекта (-> "map-tasks"). Время, которое на это требуется, определяет задачутакже как единица работы .


Таскель

Хотя использование слова «задача» для всей обработки одного фрагмента соответствует внутреннему коду multiprocessing.pool, нет никаких указаний на то, как должен выполняться один вызов указанного пользователем funcс одним элементом фрагмента в качестве аргумента (ов). упомянутый. Чтобы избежать путаницы, возникающей из-за конфликтов имен (подумайте о maxtasksperchildпараметре -параметр для метода пула __init__), в этом ответе отдельные единицы работы в задаче будут обозначаться как таскель .

Taskel (от задачи + эль ление) является наименьшей единицей работы в рамках задачи . Это однократное выполнение функции, указанной funcпараметром Pool-метода, вызываемое с аргументами, полученными из одного элемента переданного фрагмента . Задача состоит из chunksize taskels .


Накладные расходы на распараллеливание (PO)

PO состоит из внутренних служебных данных Python и служебных данных для межпроцессного взаимодействия (IPC). Накладные расходы на задачу в Python включают код, необходимый для упаковки и распаковки задач и их результатов. IPC-overhead включает необходимую синхронизацию потоков и копирование данных между разными адресными пространствами (требуется два шага копирования: родительский -> очередь -> дочерний). Объем накладных расходов IPC зависит от ОС, оборудования и размера данных, что затрудняет обобщение результатов.


2. Цели распараллеливания

При использовании многопроцессорной обработки наша общая цель (очевидно) - минимизировать общее время обработки всех задач. Для достижения этой общей цели наша техническая цель должна заключаться в оптимизации использования аппаратных ресурсов .

Некоторые важные подцели для достижения технической цели:

  • минимизировать накладные расходы на распараллеливание (наиболее известный, но не единственный: IPC )
  • высокая загрузка всех ядер процессора
  • ограничение использования памяти для предотвращения чрезмерного разбиения на страницы ( мусора ) ОС

Во - первых, задачи должны быть вычислительно тяжелый (интенсивный) достаточно, чтобы получить обратно РО мы должны платить за распараллеливания. Актуальность PO снижается с увеличением абсолютного времени вычислений на задачу. Или, говоря наоборот, чем больше абсолютное время вычислений на одну задачу для вашей проблемы, тем менее актуальной становится потребность в сокращении PO. Если ваши вычисления будут занимать часы на одну задачу, накладные расходы IPC будут незначительными по сравнению с этим. Основная задача здесь - предотвратить простаивание рабочих процессов после распределения всех задач. Держать все ядра загруженными означает, что мы максимально распараллеливаем.


3. Сценарии распараллеливания

Какие факторы определяют оптимальный аргумент размера фрагмента для таких методов, как multiprocessing.Pool.map ()

Основным фактором, о котором идет речь, является то, сколько времени вычислений может варьироваться в зависимости от наших отдельных задач. Чтобы назвать это, выбор оптимального размера блока определяется коэффициентом вариации ( CV ) времени вычислений на одну задачу.

Двумя крайними сценариями по шкале, вытекающими из степени этой вариации, являются:

  1. Для всех таскелей требуется одинаковое время вычисления.
  2. На выполнение таскела могут уйти секунды или дни.

Для лучшей запоминаемости я буду называть эти сценарии:

  1. Плотный сценарий
  2. Широкий сценарий


Плотный сценарий

В плотном сценарии было бы желательно распределить все таскелы сразу, чтобы свести к минимуму необходимые IPC и переключение контекста. Это означает, что мы хотим создать столько блоков, сколько есть рабочих процессов. Как уже было сказано выше, вес PO увеличивается с сокращением времени вычислений на задачу.

Для максимальной пропускной способности мы также хотим, чтобы все рабочие процессы были заняты до тех пор, пока все задачи не будут обработаны (без простаивающих рабочих). Для этой цели распределенные блоки должны быть одинакового размера или близки к нему.


Широкий сценарий

Ярким примером для широкого сценария может быть проблема оптимизации, когда результаты либо сходятся быстро, либо вычисления могут занимать часы, если не дни. Обычно невозможно предсказать, какую смесь «легких задач» и «тяжелых задач» будет содержать задача в таком случае, поэтому не рекомендуется распределять слишком много задач одновременно в пакете задач. Распределение меньшего количества задач одновременно, чем возможно, означает повышение гибкости планирования. Это необходимо здесь для достижения нашей промежуточной цели по высокому использованию всех ядер.

Если бы Poolметоды по умолчанию были полностью оптимизированы для плотного сценария, они все чаще создавали бы неоптимальные сроки для каждой проблемы, расположенной ближе к широкому сценарию.


4. Риски Chunksize> 1

Рассмотрим этот упрощенный пример псевдокода расширенного сценария , который мы хотим передать в метод пула:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

Вместо фактических значений мы делаем вид, что видим необходимое время вычислений в секундах, для простоты только 1 минуту или 1 день. Мы предполагаем, что пул имеет четыре рабочих процесса (на четырех ядрах) и chunksizeустановлен в 2. Поскольку порядок будет сохранен, куски, отправленные работникам, будут следующими:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

Поскольку у нас достаточно рабочих, а время вычислений достаточно велико, мы можем сказать, что каждый рабочий процесс в первую очередь получит кусок для работы. (Это не обязательно для быстрого выполнения задач). Кроме того, мы можем сказать, что вся обработка займет около 86400 + 60 секунд, потому что это наибольшее общее время вычисления для фрагмента в этом искусственном сценарии, и мы распределяем фрагменты только один раз.

Теперь рассмотрим эту итерацию, в которой только один элемент меняет свою позицию по сравнению с предыдущей итерацией:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

... и соответствующие чанки:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

Просто не повезло с сортировкой итераций, которые почти удвоили (86400 + 86400) наше общее время обработки! Рабочий, получающий порочный (86400, 86400) -chunk, блокирует передачу второго тяжелого Taskel в своей задаче одному из простаивающих рабочих, уже завершивших свои (60, 60) -chunks. Очевидно, мы бы не рискнули таким неприятным исходом, если бы поставили chunksize=1.

Это риск больших кусков. Чем больше размер фрагментов, тем меньше накладных расходов мы жертвуем гибкостью планирования, а в случаях, подобных описанным выше, это плохая сделка.

Как мы увидим в главе 6. Количественная оценка эффективности алгоритма , большие куски также могут привести к неоптимальным результатам для плотных сценариев .


5. Алгоритм размера фрагментов Пула

Ниже вы найдете немного измененную версию алгоритма внутри исходного кода. Как видите, я отрезал нижнюю часть и превратил ее в функцию для chunksizeвнешнего вычисления аргумента. Я также заменил 4с factorпараметром и переданы на len()звонки.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

Чтобы убедиться, что мы все на одной странице, вот что divmod:

divmod(x, y)это встроенная функция, которая возвращает (x//y, x%y). x // y- деление этажа, возвращающее округленное в меньшую сторону частное от x / y, а x % y- операция по модулю, возвращающая остаток от x / y. Следовательно, например, divmod(10, 3)возвращается (3, 1).

Теперь, когда вы посмотрите на chunksize, extra = divmod(len_iterable, n_workers * 4), вы заметите, что n_workersздесь делитель yна x / yи умножение на 4, без дальнейшей настройки if extra: chunksize +=1, приводит к начальному размеру фрагмента, по крайней мере, в четыре раза меньшему (для len_iterable >= n_workers * 4), чем это было бы в противном случае.

Для просмотра эффекта умножения 4на результат промежуточного размера фрагмента рассмотрите эту функцию:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

Вышеупомянутая функция вычисляет наивный размер chunksize ( cs_naive) и размер фрагмента первого шага алгоритма chunksize-алгоритма Pool ( cs_pool1), а также размер фрагмента для полного алгоритма Pool ( cs_pool2). Далее он вычисляет реальные коэффициенты rf_pool1 = cs_naive / cs_pool1 и rf_pool2 = cs_naive / cs_pool2, которые говорят нам, во сколько раз наивно рассчитанные размеры фрагментов больше, чем внутренняя версия (и) Pool.

Ниже вы видите две фигуры, созданные на основе результатов этой функции. На левом рисунке просто показаны размеры фрагментов n_workers=4вплоть до итеративной длины 500. На правом рисунке показаны значения для rf_pool1. Для итерируемой длины 16реальный коэффициент становится >=4(для len_iterable >= n_workers * 4), а его максимальное значение - 7для повторяющейся длины 28-31. Это огромное отклонение от исходного коэффициента 4, к которому алгоритм сходится для более длительных итераций. «Дольше» здесь относительно и зависит от количества указанных рабочих.

Рисунок 1

Помните chunksize cs_pool1все еще не хватает extra-adjustment с остатком от divmodсодержащихся в cs_pool2от полного алгоритма.

Алгоритм продолжается:

if extra:
    chunksize += 1

Теперь в случаях, когда есть остаток ( extraот операции divmod), увеличение размера фрагмента на 1, очевидно, не сработает для каждой задачи. В конце концов, если бы это было так, для начала не было бы остатка.

Как вы можете видеть на рисунке ниже, то « экстра-лечение » имеет эффект, что реальный фактор для rf_pool2ныне сходится в направлении 4от ниже 4 и отклонение несколько мягче. Стандартное отклонение для n_workers=4и len_iterable=500падает от 0.5233для rf_pool1до 0.4115для rf_pool2.

фигура 2

В конце концов, увеличение chunksizeна 1 приводит к тому, что последняя переданная задача имеет размер только len_iterable % chunksize or chunksize.

Тем не менее, более интересный и более значительный эффект дополнительной обработки мы увидим позже по количеству сгенерированных блоков ( n_chunks). Для достаточно длинных итераций завершенный алгоритм размера фрагментов Pool ( n_pool2на рисунке ниже) будет стабилизировать количество фрагментов в n_chunks == n_workers * 4. Напротив, наивный алгоритм (после начальной отрыжки) продолжает чередоваться между n_chunks == n_workersи по n_chunks == n_workers + 1мере увеличения длины итерации.

рисунок3

Ниже вы найдете две расширенные информационные функции для Pool и наивный алгоритм chunksize. Вывод этих функций понадобится в следующей главе.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

Пусть вас не смущает, вероятно, неожиданный вид calc_naive_chunksize_info. extraОт divmodне используется для расчета chunksize.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6. Количественная оценка эффективности алгоритма.

Теперь, после того, как мы увидели, как результат Poolchunksize-алгоритма выглядит по-другому по сравнению с выводом наивного алгоритма ...

  • Как узнать, действительно ли подход Пула что- то улучшает ?
  • И что именно это могло что - то быть?

Как показано в предыдущей главе, для более длинных итераций (большего числа задач) алгоритм размера фрагментов Pool приблизительно разделяет итерацию на в четыре раза больше фрагментов, чем наивный метод. Меньшие фрагменты означают больше задач, а большее количество задач означает больше накладных расходов на распараллеливание (PO) , стоимость, которую необходимо сопоставить с преимуществом повышенной гибкости планирования (вспомните «Риски размера фрагмента> 1» ).

По довольно очевидным причинам базовый алгоритм размера фрагментов Pool не может сопоставить гибкость планирования с PO для нас. Накладные расходы IPC зависят от ОС, оборудования и размера данных. Алгоритм не может знать, на каком оборудовании мы запускаем наш код, и при этом он не знает, сколько времени потребуется для завершения таскела. Это эвристика, обеспечивающая базовую функциональность для всех возможных сценариев. Это означает, что его нельзя оптимизировать для какого-либо конкретного сценария. Как упоминалось ранее, PO также становится все менее проблемным с увеличением времени вычислений на задачу (отрицательная корреляция).

Когда вы вспоминаете цели распараллеливания из главы 2, вы видите один пункт:

  • высокая загрузка всех ядер процессора

Ранее упомянутое кое-что , что алгоритм размера фрагмента Pool может попытаться улучшить, - это минимизация простаивающих рабочих процессов , соответственно, использование ядер процессора .

Повторяющийся вопрос о SO multiprocessing.Poolзадают люди, интересующиеся неиспользуемыми ядрами / простаивающими рабочими процессами в ситуациях, когда вы ожидаете, что все рабочие процессы будут заняты. Хотя у этого может быть много причин, простаивающие рабочие процессы ближе к концу вычисления - это наблюдение, которое мы часто можем сделать, даже с плотными сценариями (равное время вычислений на таскел) в тех случаях, когда количество рабочих не является делителем числа. кусков ( n_chunks % n_workers > 0).

Теперь вопрос:

Как мы можем практически преобразовать наше понимание размеров фрагментов в нечто, что позволяет нам объяснить наблюдаемое использование рабочих или даже сравнить эффективность различных алгоритмов в этом отношении?


6.1 Модели

Для получения более глубокого понимания нам нужна форма абстракции параллельных вычислений, которая упрощает чрезмерно сложную реальность до управляемой степени сложности, сохраняя при этом значимость в определенных границах. Такая абстракция называется моделью . Реализация такой « модели распараллеливания» (PM) генерирует отображаемые работником метаданные (временные метки), как и при реальных вычислениях, если бы данные собирались. Сгенерированные моделью метаданные позволяют прогнозировать показатели параллельных вычислений при определенных ограничениях.

фигура 4

Одной из двух подмоделей в рамках определенного здесь PM является Модель распределения (DM) . DM объясняет , как атомные единицы работы (taskels) распределены по параллельным рабочим и времени , когда нет других факторов , кроме соответствующего chunksize-алгоритма, число рабочих, ввод-итерация (количество taskels) и их продолжительности вычислений не считаются . Это означает, что никакие накладные расходы не включены.

Для получения полной ПМ , то ДЙ расширяются с ВЛИ Model (OM) , представляющий различные формы Распараллеливания Накладного (PO) . Такая модель должна быть откалибрована для каждого узла индивидуально (зависимости от оборудования и ОС). Сколько форм накладных расходов представлено в OM , остается открытым, поэтому могут существовать несколько OM с разной степенью сложности. Какой уровень точности требуется внедренному OM, определяется общим весом PO для конкретных вычислений. Более короткие задачи приводят к большему весу PO , что, в свою очередь, требует более точного, OM.если бы мы пытались предсказать эффективность распараллеливания (PE) .


6.2 Параллельное расписание (PS)

Параллельный график представляет собой двумерное представление параллельных вычислений, где ось х представляет время и ось у представляет собой пул параллельных рабочих. Количество рабочих и общее время вычислений обозначают протяженность прямоугольника, в котором нарисованы меньшие прямоугольники. Эти меньшие прямоугольники представляют собой элементарные единицы работы (таскелы).

Ниже вы найдете визуализацию PS, нарисованную с использованием данных из алгоритма chunksize DM of Pool для плотного сценария .

цифра 5

  • Ось x разделена на равные единицы времени, где каждая единица обозначает время вычислений, необходимое таскелю.
  • Ось Y делится на количество рабочих процессов, используемых пулом.
  • Таскель здесь отображается в виде самого маленького прямоугольника голубого цвета, помещенного на временную шкалу (расписание) анонимизированного рабочего процесса.
  • Задача - это один или несколько задач на шкале рабочего времени, которые постоянно выделяются одним и тем же оттенком.
  • Единицы времени простоя представлены красными плитками.
  • Параллельное расписание разбито на разделы. Последняя часть - это хвостовая часть.

Названия составных частей можно увидеть на картинке ниже.

фигура6

В полном PM, включающем OM , доля холостого хода не ограничивается хвостовой частью, но также включает пространство между задачами и даже между элементами задач.


6.3 Эффективность

Представленные выше модели позволяют количественно оценить коэффициент использования рабочих. Мы можем выделить:

  • Эффективность распределения (DE) - рассчитывается с помощью DM (или упрощенного метода для плотного сценария ).
  • Эффективность распараллеливания (PE) - вычисляется с помощью откалиброванного PM (прогноз) или рассчитывается на основе метаданных реальных вычислений.

Важно отметить, что вычисленная эффективность не коррелирует автоматически с более быстрыми общими вычислениями для данной проблемы распараллеливания. Использование рабочих в этом контексте различает только работника, имеющего начатую, но еще не завершенную задачу, и рабочего, у которого нет такой «открытой» панели задач. Это означает, что возможен холостой ход течение периода времени таскела не регистрируется.

Все вышеупомянутые значения эффективности в основном получены путем вычисления частного деления доли занятости / параллельного расписания . Разница между DE и PE заключается в том, что доля занятости занимает меньшую часть общего параллельного расписания для PM с расширенными накладными расходами .

В этом ответе будет обсуждаться только простой метод вычисления DE. для плотного сценария. Этого достаточно для сравнения различных алгоритмов размера фрагментов, поскольку ...

  1. ... DM - это часть PM , которая изменяется в зависимости от используемых алгоритмов chunksize.
  2. ... Плотный сценарий с равными длительностями вычислений на каждую задачуел изображает "стабильное состояние", для которого эти временные интервалы выпадают из уравнения. Любой другой сценарий просто приведет к случайным результатам, поскольку порядок задач имеет значение.

6.3.1 Абсолютная эффективность распределения (ADE)

Эту базовую эффективность в целом можно рассчитать, разделив долю занятости на весь потенциал параллельного расписания :

Абсолютная эффективность распределения (ADE) = доля занятости / параллельное расписание

Для плотного сценария упрощенный расчетный код выглядит так:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

Если нет Холостой Share , Busy доля будет равна к Parallel Schedule , следовательно , мы получаем ADE 100%. В нашей упрощенной модели это сценарий, при котором все доступные процессы будут заняты все время, необходимое для обработки всех задач. Другими словами, вся работа эффективно распараллеливается на 100 процентов.

Но почему я держу в виде PE в качестве абсолютного PE здесь?

Чтобы понять это, мы должны рассмотреть возможный случай для chunksize (cs), который обеспечивает максимальную гибкость планирования (также, количество горцев может быть. Совпадение?):

__________________________________ ~ ONE ~ __________________________________

Если у нас, например, четыре рабочих процесса и 37 задач, будут простаивающие рабочие даже с chunksize=1, просто потому, чтоn_workers=4 это не делитель 37. Остаток от деления 37/4 равен 1. Этот единственный оставшийся таскел должен быть обрабатывается единственным работником, а остальные три простаивают.

Точно так же останется один неработающий рабочий с 39 таскелями, как вы можете видеть на картинке ниже.

рисунок 7

Когда вы сравните верхнее параллельное расписание для chunksize=1с версией ниже для chunksize=3, вы заметите, что верхнее параллельное расписание меньше, а временная шкала на оси x короче. Теперь должно стать очевидным, насколько неожиданно большие куски могут привести к увеличению общего времени вычислений, даже для плотных сценариев .

Но почему бы просто не использовать длину оси x для расчета эффективности?

Потому что накладные расходы в этой модели не содержатся. Он будет отличаться для обоих размеров фрагментов, поэтому ось x на самом деле напрямую не сопоставима. Накладные расходы могут привести к увеличению общего времени вычислений, как показано в случае 2 на рисунке ниже.

цифра 8


6.3.2 Относительная эффективность распределения (RDE)

Значение ADE не содержит информации, если возможно лучшее распределение таскелей с chunksize, установленным на 1. Лучше здесь все равно будет меньше долю холостого хода .

Чтобы получить значение DE, скорректированное с учетом максимально возможного DE , мы должны разделить рассматриваемое ADE на ADE, которое мы получаем chunksize=1.

Относительная эффективность распределения (RDE) = ADE_cs_x / ADE_cs_1

Вот как это выглядит в коде:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE , как здесь определено, по сути, является рассказом о хвосте параллельного расписания . RDE зависит от максимального эффективного размера фрагмента, содержащегося в хвосте. (Этот хвост может иметь длину по оси X chunksizeили last_chunk.) Следствием этого является то, что RDE естественным образом сходится к 100% (даже) для всех видов «хвостовых образов», как показано на рисунке ниже.

рисунок9

Низкий RDE ...

  • это сильный намек на потенциал оптимизации.
  • естественно, становится менее вероятным для более длинных итераций, потому что относительная хвостовая часть общего параллельного расписания сокращается.

Вы можете найти часть II этого ответа здесь .

Darkonaut
источник
51
Один из самых эпических ответов, которые я видел на SO.
Christian Long
4
О, это был ваш короткий ответ: P
d_kennetz
1
Но на самом деле .. это отличный ответ. Я пометил вопрос звездочкой для будущих случаев, когда я хочу лучше понять это. Беглый просмотр уже многому меня научил! Спасибо
d_kennetz 05
2
@ L.Iridium Добро пожаловать! Я использовал matplotlib, где это было возможно, иначе ... LibreOffice calc + Pinta (базовое редактирование изображений). Да, я знаю ... но это как-то работает. ;)
Darkonaut 02
2
Первый ответ с таблицей содержания, увиденной на SO.
tly_alex
51

Об этом ответе

Этот ответ является частью II принятого выше ответа .


7. Наивный алгоритм против алгоритма пула.

Прежде чем вдаваться в подробности, рассмотрим две гифки ниже. Для диапазона разной iterableдлины они показывают, как два сравниваемых алгоритма разбивают переданный фрагмент iterable(к тому времени это будет последовательность) и как могут быть распределены результирующие задачи. Порядок рабочих является случайным, и количество распределенных задач на одного работника в действительности может отличаться от этого изображения для легких задач и / или задач в широком сценарии. Как упоминалось ранее, сюда не включаются накладные расходы. Однако для достаточно тяжелых задач в плотном сценарии с незначительными размерами передаваемых данных реальные вычисления рисуют очень похожую картину.

cs_4_50

cs_200_250

Как показано в главе « 5. Алгоритм размера фрагментов пула », с алгоритмом размера фрагментов пула количество фрагментов будет стабилизироваться на уровне n_chunks == n_workers * 4достаточно больших итераций, в то время как он продолжает переключаться между n_chunks == n_workersи n_chunks == n_workers + 1с наивным подходом. Применяется наивный алгоритм: поскольку n_chunks % n_workers == 1is Truefor n_chunks == n_workers + 1, будет создан новый раздел, в котором будет задействован только один рабочий.

Наивный алгоритм Chunksize:

Вы можете подумать, что создали задачи для того же количества рабочих, но это будет верно только для случаев, когда нет остатка для len_iterable / n_workers. Если есть остаток, будет новый раздел только одна задача для одного работника. В этот момент ваши вычисления больше не будут параллельными.

Ниже вы видите рисунок, аналогичный показанному в главе 5, но на котором отображается количество разделов, а не количество фрагментов. Для полного chunksize-алгоритма Pool ( n_pool2) n_sectionsбудет стабилизироваться печально известный жестко запрограммированный коэффициент 4. Для наивного алгоритма n_sectionsбудет чередоваться от одного до двух.

рисунок10

Для алгоритма размера фрагмента Pool стабилизация с n_chunks = n_workers * 4помощью вышеупомянутой дополнительной обработки предотвращает создание здесь нового раздела и сохраняет долю холостого хода ограниченным одним рабочим для достаточно долгих итераций. Более того, алгоритм будет продолжать сокращать относительный размер доли холостого хода , что приводит к приближению значения RDE к 100%.

«Достаточно долго» для n_workers=4это len_iterable=210, например. Для итераций, равных или превышающих это значение, доля холостого хода будет ограничена одним 4воркером , черта, изначально утраченная из-за умножения в алгоритме размера фрагмента в первую очередь.

рисунок 11

Наивный алгоритм chunksize также сходится к 100%, но делает это медленнее. Эффект конвергенции зависит исключительно от того факта, что относительная часть хвоста сжимается в случаях, когда будет две секции. Этот хвост только с одним нанятым работником ограничен длиной по оси x n_workers - 1, максимально возможным остатком для len_iterable / n_workers.

Чем отличаются фактические значения RDE для наивного алгоритма размера блока и алгоритма Pool?

Ниже вы найдете две тепловые карты, на которых показаны значения RDE для всех повторяемых длин до 5000, для всех чисел рабочих от 2 до 100. Цветовая шкала изменяется от 0,5 до 1 (50% -100%). Вы заметите гораздо больше темных областей (более низкие значения RDE) для наивного алгоритма на левой тепловой карте. Напротив, алгоритм размера фрагментов Пула справа рисует гораздо более солнечную картину.

рисунок12

Диагональный градиент темных углов в нижнем левом углу по сравнению с яркими углами в правом верхнем углу снова показывает зависимость от количества рабочих для того, что называется «длинной итерацией».

Насколько плохо это может быть с каждым алгоритмом?

С алгоритмом размера фрагмента Pool значение RDE 81,25% является наименьшим значением для диапазона рабочих и повторяемых длин, указанных выше:

рисунок13

С наивным алгоритмом chunksize дела могут пойти намного хуже. Самый низкий расчетный RDE здесь составляет 50,72%. В этом случае почти половину времени вычислений работает только один рабочий! Итак, берегитесь, счастливые обладатели Knights Landing . ;)

рисунок14


8. Проверка реальности

В предыдущих главах мы рассматривали упрощенную модель чисто математической задачи распределения, лишенную мельчайших деталей, которые в первую очередь делают многопроцессорную обработку такой сложной темой. Чтобы лучше понять , как далеко распространения модели (DM) в одиночку может способствовать объяснению наблюдаемого использования рабочих в реальности, теперь мы рассмотрим параллельные расписания, составленные с помощью реальных вычислений.

Настроить

Все следующие графики связаны с параллельным выполнением простой фиктивной функции с привязкой к процессору, которая вызывается с различными аргументами, поэтому мы можем наблюдать, как нарисованное параллельное расписание изменяется в зависимости от входных значений. «Работа» в этой функции состоит только из итерации по объекту диапазона. Этого уже достаточно для того, чтобы ядро ​​было занято, поскольку мы передаем в него огромные числа. По желанию функция принимает некоторую уникальную для таскеля добавку, dataкоторая просто возвращается без изменений. Поскольку каждый таскел включает в себя один и тот же объем работы, мы по-прежнему имеем дело с плотным сценарием.

Функция украшена оберткой, принимающей временные метки с разрешением ns (Python 3.7+). Отметки времени используются для расчета временного интервала таскела и, следовательно, позволяют рисовать эмпирическое параллельное расписание.

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Метод карты звезды Pool также оформлен таким образом, что рассчитывается только сам вызов карты звезды. «Начало» и «конец» этого вызова определяют минимум и максимум по оси x созданного параллельного расписания.

Мы собираемся наблюдать вычисление 40 таскелей на четырех рабочих процессах на машине со следующими характеристиками: Python 3.7.1, Ubuntu 18.04.2, Intel® Core ™ i7-2600K CPU @ 3.40GHz × 8

Входные значения, которые будут варьироваться, - это количество итераций в цикле for (30k, 30M, 600M) и размер дополнительно отправляемых данных (для каждого таскела, numpy-ndarray: 0 MiB, 50 MiB).

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

Показанные ниже прогоны были отобраны для того, чтобы иметь одинаковый порядок чанков, чтобы вы могли лучше заметить различия по сравнению с параллельным расписанием из модели распределения, но не забывайте, что порядок, в котором рабочие получают свою задачу, недетерминирован.

Прогнозирование DM

Повторюсь, модель распределения «предсказывает» параллельное расписание, как мы уже видели это ранее в главе 6.2:

рисунок15

1-й ЗАПУСК: 30 тыс. Итераций и 0 МБ данных на каждую задачу

рисунок16

Наш первый запуск здесь очень короткий, таскели очень «легкие». Весь pool.starmap()вызов занял всего 14,5 мс. Вы заметите, что в отличие от DM , холостой ход не ограничивается хвостовой частью, но также имеет место между задачами и даже между задачами. Это потому, что наш реальный график, естественно, включает в себя всевозможные накладные расходы. Холостой ход здесь означает все, что не входит в таскель. Возможный реальный холостой ход во время таскела не фиксируется, как уже упоминалось ранее.

Кроме того, вы можете видеть, что не все работники выполняют свои задачи одновременно. Это связано с тем, что все воркеры питаются через общий ресурс, inqueueи только один воркер может читать с него одновременно. То же самое касаетсяoutqueue . Это может вызвать большие неприятности, если вы передаете нематериальные объемы данных, как мы увидим позже.

Более того, вы можете видеть, что, несмотря на то, что каждый таскел включает в себя одинаковый объем работы, фактический измеренный временной интервал для таскела сильно различается. Задачи, распределенные для worker-3 и worker-4, требуют больше времени, чем те, которые обрабатываются первыми двумя рабочими. Для этого прогона я подозреваю, что это связано с тем, что в тот момент турбо-ускорение на ядрах для worker-3/4 больше не было, поэтому они обрабатывали свои задачи с более низкой тактовой частотой.

Все вычисления настолько легкие, что факторы хаоса, вносимые оборудованием или ОС, могут сильно исказить PS . Вычисления - это «лист на ветру», и предсказание DM не имеет большого значения даже для теоретически подходящего сценария.

2-й ЗАПУСК: 30 млн итераций и 0 МБ данных на каждую задачу

рисунок 17

Увеличение количества итераций в цикле for с 30 000 до 30 миллионов приводит к реальному параллельному расписанию, которое близко к идеальному совпадению с предсказанным данными, предоставленными DM , ура! Вычисления для каждого таскела теперь достаточно тяжелы, чтобы маргинализировать холостые части в начале и между ними, позволяя видеть только большую долю холостого хода, которую предсказал DM .

3-й ЗАПУСК: 30 млн итераций и 50 МБ данных на каждую задачу

рисунок 18

Сохранение 30 миллионов итераций, но дополнительная отправка 50 МБ на таскель вперед и назад снова искажает картину. Здесь хорошо виден эффект очереди. Рабочий-4 должен ждать своей второй задачи дольше, чем Рабочий-1. А теперь представьте этот график с 70 рабочими!

В случае, если таскели очень легкие с вычислительной точки зрения, но предоставляют заметный объем данных в качестве полезной нагрузки, узкое место одной общей очереди может помешать каким-либо дополнительным преимуществам добавления большего количества рабочих в пул, даже если они поддерживаются физическими ядрами. В таком случае Worker-1 может выполнить свою первую задачу и ожидать новую даже до того, как Worker-40 получит свою первую задачу.

Теперь должно стать очевидным, почему время вычислений Poolне всегда уменьшается линейно с количеством рабочих. Отправка относительно больших объемов данных может привести к сценариям, в которых большую часть времени тратится на ожидание копирования данных в адресное пространство рабочего, и одновременно может быть загружен только один рабочий.

4-й ЗАПУСК: 600 млн итераций и 50 МБ данных на каждую задачу

рисунок19

Здесь мы снова отправляем 50 МБ, но увеличиваем количество итераций с 30 до 600 МБ, что увеличивает общее время вычислений с 10 до 152 с. Нарисованное параллельное расписание снова близко к идеальному совпадению с прогнозируемым, накладные расходы на копирование данных минимизированы.


9. Заключение

Обсуждаемое умножение на 4увеличивает гибкость планирования, но также усиливает неравномерность распределений таскелей. Без этого умножения доля холостого хода была бы ограничена одним рабочим даже для коротких итераций (для DM с плотным сценарием). Алгоритм размера фрагментов Pool требует, чтобы итерации ввода были определенного размера, чтобы восстановить эту черту.

Как мы надеемся, этот ответ показал, что алгоритм размера фрагмента Pool приводит к лучшему использованию ядра в среднем по сравнению с наивным подходом, по крайней мере, для среднего случая и без учета длительных накладных расходов. Наивный алгоритм здесь может иметь эффективность распределения (DE) всего ~ 51%, в то время как алгоритм размера фрагмента Pool имеет низкий уровень ~ 81%. Однако DE не содержит накладных расходов на распараллеливание (PO), как IPC. Глава 8 показала, что DE все еще может иметь большую предсказательную силу для плотного сценария с минимальными накладными расходами.

Несмотря на то, что алгоритм размера фрагментов Пула обеспечивает более высокое значение DE по сравнению с наивным подходом, он не обеспечивает оптимального распределения таскелей для каждого входного созвездия. Хотя простой статический алгоритм фрагментации не может оптимизировать (включая накладные расходы) эффективность распараллеливания (PE), нет никакой внутренней причины, по которой он не всегда мог бы обеспечить относительную эффективность распределения (RDE) 100%, то есть такую ​​же DE, что и с chunksize=1. Простой алгоритм chunksize состоит только из базовой математики и может «разрезать торт» любым способом.

В отличие от реализации в Pool алгоритма «порции равного размера», алгоритм «порции равного размера» обеспечит RDE 100% для каждой комбинации len_iterable/ n_workers. Алгоритм разделения на части равного размера было бы немного сложнее реализовать в исходном коде Pool, но его можно модулировать поверх существующего алгоритма, просто упаковывая задачи извне (я буду ссылаться здесь на случай, если я брошу вопрос / ответ на как это сделать).

Darkonaut
источник
6

Я думаю, что отчасти вам не хватает того, что ваша наивная оценка предполагает, что каждая единица работы занимает одинаковое количество времени, и в этом случае ваша стратегия будет наилучшей. Но если некоторые задания завершаются раньше, чем другие, то некоторые ядра могут бездействовать, ожидая завершения медленных заданий.

Таким образом, если разбить фрагменты на в 4 раза больше частей, то, если один фрагмент завершился раньше, это ядро ​​может начать следующий фрагмент (в то время как другие ядра продолжают работать над своим более медленным фрагментом).

Я не знаю, почему они выбрали коэффициент 4 точно, но это будет компромисс между минимизацией накладных расходов кода карты (который требует максимально возможных кусков) и балансировкой кусков, занимающих разное количество раз (что требует наименьшего возможного фрагмента ).

Роб
источник