Какие факторы определяют оптимальный chunksize
аргумент в пользу таких методов multiprocessing.Pool.map()
? .map()
Похоже, что этот метод использует произвольную эвристику для размера фрагмента по умолчанию (поясняется ниже); что мотивирует этот выбор и существует ли более продуманный подход, основанный на какой-либо конкретной ситуации / настройке?
Пример - скажи, что я:
- Пропускание
iterable
на.map()
что ~ 15 миллионов элементов; - Работаем на машине с 24 ядрами и используем по умолчанию
processes = os.cpu_count()
внутриmultiprocessing.Pool()
.
Я наивно считаю, что каждому из 24 рабочих нужно дать кусок равного размера, то есть 15_000_000 / 24
625 000 человек. Большие блоки должны снизить текучесть / накладные расходы при полном использовании всех рабочих. Но похоже, что здесь отсутствуют некоторые потенциальные недостатки предоставления больших партий каждому рабочему. Это неполное изображение, и что мне не хватает?
Часть моего вопроса проистекает из логики по умолчанию для if chunksize=None
: both .map()
и .starmap()
call .map_async()
, которая выглядит следующим образом:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
В чем логика divmod(len(iterable), len(self._pool) * 4)
? Это означает, что размер фрагмента будет ближе к 15_000_000 / (24 * 4) == 156_250
. Каково намерение умножения len(self._pool)
на 4?
Это делает результирующий размер фрагмента в 4 раза меньше, чем моя "наивная логика" сверху, которая состоит из простого деления длины итерации на количество рабочих вpool._pool
.
Наконец, есть еще этот фрагмент из документации Python, .imap()
который еще больше подогревает мое любопытство:
chunksize
Аргумент является таким же , как тот , используемымmap()
методом. Для очень длинных итераций использование большого значения дляchunksize
может значительно ускорить выполнение задания, чем использование значения по умолчанию 1.
Связанный ответ, который полезен, но слишком высокоуровневый: Многопроцессорность Python: почему большие куски медленнее? .
4
Произвольно, и весь расчет chunksize является эвристическим. Важным фактором является то, насколько может варьироваться ваше фактическое время обработки. Чуть подробнее об этом здесь , пока я не было времени для ответа , если еще нужен тогда.Ответы:
Краткий ответ
Chunksize-алгоритм пула является эвристическим. Он обеспечивает простое решение для всех возможных сценариев проблем, которые вы пытаетесь внедрить в методы Pool. Как следствие, его нельзя оптимизировать для какого-либо конкретного сценария.
Алгоритм произвольно делит итерацию примерно на четыре части больше, чем при наивном подходе. Больше фрагментов означает больше накладных расходов, но увеличивает гибкость планирования. Как будет видно из этого ответа, в среднем это приводит к более высокому использованию рабочих, но без гарантии более короткого общего времени вычислений для каждого случая.
«Приятно знать, - можете подумать вы, - но как эта информация поможет мне в решении конкретных проблем, связанных с многопроцессорностью?» Что ж, это не так. Более честный короткий ответ: «короткого ответа нет», «многопроцессорность - это сложно» и «это зависит от обстоятельств». Наблюдаемый симптом может иметь разные корни даже при одинаковых сценариях.
Этот ответ пытается предоставить вам основные концепции, которые помогут вам получить более четкое представление о черном ящике планирования Pool. Он также пытается дать вам несколько основных инструментов для распознавания и предотвращения потенциальных обрывов, поскольку они связаны с размером фрагментов.
Содержание
Часть I
Количественная оценка эффективности алгоритма
6.1 Модели
6.2 Параллельное расписание
6.3 Эффективность
6.3.1 Абсолютная эффективность распределения (ADE)
6.3.2 Относительная эффективность распределения (RDE)
Часть II
Сначала необходимо уточнить некоторые важные термины.
1. Определения
Кусок
Здесь фрагмент - это доля
iterable
аргумента, указанного в вызове метода пула. Как рассчитывается размер фрагмента и какие эффекты это может иметь, является темой этого ответа.Задача
Физическое представление задачи в рабочем процессе в терминах данных можно увидеть на рисунке ниже.
На рисунке показан пример вызова
pool.map()
, отображаемого вдоль строки кода, взятого изmultiprocessing.pool.worker
функции, где задача, считанная из функции,inqueue
распаковывается.worker
является основной функциейMainThread
рабочего процесса пула.func
-Argument указан в бассейне-методе будет только совпадать сfunc
-переменными внутриworker
-функцией для методов одного вызова , какapply_async
и дляimap
сchunksize=1
. Для остальных методов пула сchunksize
параметром-функцией обработкиfunc
будет функция- преобразователь (mapstar
илиstarmapstar
). Эта функция отображает указанный пользователемfunc
параметр на каждый элемент переданного фрагмента итерируемого объекта (-> "map-tasks"). Время, которое на это требуется, определяет задачутакже как единица работы .Таскель
Хотя использование слова «задача» для всей обработки одного фрагмента соответствует внутреннему коду
multiprocessing.pool
, нет никаких указаний на то, как должен выполняться один вызов указанного пользователемfunc
с одним элементом фрагмента в качестве аргумента (ов). упомянутый. Чтобы избежать путаницы, возникающей из-за конфликтов имен (подумайте оmaxtasksperchild
параметре -параметр для метода пула__init__
), в этом ответе отдельные единицы работы в задаче будут обозначаться как таскель .Накладные расходы на распараллеливание (PO)
PO состоит из внутренних служебных данных Python и служебных данных для межпроцессного взаимодействия (IPC). Накладные расходы на задачу в Python включают код, необходимый для упаковки и распаковки задач и их результатов. IPC-overhead включает необходимую синхронизацию потоков и копирование данных между разными адресными пространствами (требуется два шага копирования: родительский -> очередь -> дочерний). Объем накладных расходов IPC зависит от ОС, оборудования и размера данных, что затрудняет обобщение результатов.
2. Цели распараллеливания
При использовании многопроцессорной обработки наша общая цель (очевидно) - минимизировать общее время обработки всех задач. Для достижения этой общей цели наша техническая цель должна заключаться в оптимизации использования аппаратных ресурсов .
Некоторые важные подцели для достижения технической цели:
Во - первых, задачи должны быть вычислительно тяжелый (интенсивный) достаточно, чтобы получить обратно РО мы должны платить за распараллеливания. Актуальность PO снижается с увеличением абсолютного времени вычислений на задачу. Или, говоря наоборот, чем больше абсолютное время вычислений на одну задачу для вашей проблемы, тем менее актуальной становится потребность в сокращении PO. Если ваши вычисления будут занимать часы на одну задачу, накладные расходы IPC будут незначительными по сравнению с этим. Основная задача здесь - предотвратить простаивание рабочих процессов после распределения всех задач. Держать все ядра загруженными означает, что мы максимально распараллеливаем.
3. Сценарии распараллеливания
Основным фактором, о котором идет речь, является то, сколько времени вычислений может варьироваться в зависимости от наших отдельных задач. Чтобы назвать это, выбор оптимального размера блока определяется коэффициентом вариации ( CV ) времени вычислений на одну задачу.
Двумя крайними сценариями по шкале, вытекающими из степени этой вариации, являются:
Для лучшей запоминаемости я буду называть эти сценарии:
Плотный сценарий
В плотном сценарии было бы желательно распределить все таскелы сразу, чтобы свести к минимуму необходимые IPC и переключение контекста. Это означает, что мы хотим создать столько блоков, сколько есть рабочих процессов. Как уже было сказано выше, вес PO увеличивается с сокращением времени вычислений на задачу.
Для максимальной пропускной способности мы также хотим, чтобы все рабочие процессы были заняты до тех пор, пока все задачи не будут обработаны (без простаивающих рабочих). Для этой цели распределенные блоки должны быть одинакового размера или близки к нему.
Широкий сценарий
Ярким примером для широкого сценария может быть проблема оптимизации, когда результаты либо сходятся быстро, либо вычисления могут занимать часы, если не дни. Обычно невозможно предсказать, какую смесь «легких задач» и «тяжелых задач» будет содержать задача в таком случае, поэтому не рекомендуется распределять слишком много задач одновременно в пакете задач. Распределение меньшего количества задач одновременно, чем возможно, означает повышение гибкости планирования. Это необходимо здесь для достижения нашей промежуточной цели по высокому использованию всех ядер.
Если бы
Pool
методы по умолчанию были полностью оптимизированы для плотного сценария, они все чаще создавали бы неоптимальные сроки для каждой проблемы, расположенной ближе к широкому сценарию.4. Риски Chunksize> 1
Рассмотрим этот упрощенный пример псевдокода расширенного сценария , который мы хотим передать в метод пула:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
Вместо фактических значений мы делаем вид, что видим необходимое время вычислений в секундах, для простоты только 1 минуту или 1 день. Мы предполагаем, что пул имеет четыре рабочих процесса (на четырех ядрах) и
chunksize
установлен в2
. Поскольку порядок будет сохранен, куски, отправленные работникам, будут следующими:[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Поскольку у нас достаточно рабочих, а время вычислений достаточно велико, мы можем сказать, что каждый рабочий процесс в первую очередь получит кусок для работы. (Это не обязательно для быстрого выполнения задач). Кроме того, мы можем сказать, что вся обработка займет около 86400 + 60 секунд, потому что это наибольшее общее время вычисления для фрагмента в этом искусственном сценарии, и мы распределяем фрагменты только один раз.
Теперь рассмотрим эту итерацию, в которой только один элемент меняет свою позицию по сравнению с предыдущей итерацией:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
... и соответствующие чанки:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
Просто не повезло с сортировкой итераций, которые почти удвоили (86400 + 86400) наше общее время обработки! Рабочий, получающий порочный (86400, 86400) -chunk, блокирует передачу второго тяжелого Taskel в своей задаче одному из простаивающих рабочих, уже завершивших свои (60, 60) -chunks. Очевидно, мы бы не рискнули таким неприятным исходом, если бы поставили
chunksize=1
.Это риск больших кусков. Чем больше размер фрагментов, тем меньше накладных расходов мы жертвуем гибкостью планирования, а в случаях, подобных описанным выше, это плохая сделка.
Как мы увидим в главе 6. Количественная оценка эффективности алгоритма , большие куски также могут привести к неоптимальным результатам для плотных сценариев .
5. Алгоритм размера фрагментов Пула
Ниже вы найдете немного измененную версию алгоритма внутри исходного кода. Как видите, я отрезал нижнюю часть и превратил ее в функцию для
chunksize
внешнего вычисления аргумента. Я также заменил4
сfactor
параметром и переданы наlen()
звонки.# mp_utils.py def calc_chunksize(n_workers, len_iterable, factor=4): """Calculate chunksize argument for Pool-methods. Resembles source-code within `multiprocessing.pool.Pool._map_async`. """ chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 return chunksize
Чтобы убедиться, что мы все на одной странице, вот что
divmod
:divmod(x, y)
это встроенная функция, которая возвращает(x//y, x%y)
.x // y
- деление этажа, возвращающее округленное в меньшую сторону частное отx / y
, аx % y
- операция по модулю, возвращающая остаток отx / y
. Следовательно, например,divmod(10, 3)
возвращается(3, 1)
.Теперь, когда вы посмотрите на
chunksize, extra = divmod(len_iterable, n_workers * 4)
, вы заметите, чтоn_workers
здесь делительy
наx / y
и умножение на4
, без дальнейшей настройкиif extra: chunksize +=1
, приводит к начальному размеру фрагмента, по крайней мере, в четыре раза меньшему (дляlen_iterable >= n_workers * 4
), чем это было бы в противном случае.Для просмотра эффекта умножения
4
на результат промежуточного размера фрагмента рассмотрите эту функцию:def compare_chunksizes(len_iterable, n_workers=4): """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize for Pool's complete algorithm. Return chunksizes and the real factors by which naive chunksizes are bigger. """ cs_naive = len_iterable // n_workers or 1 # naive approach cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo. cs_pool2 = calc_chunksize(n_workers, len_iterable) real_factor_pool1 = cs_naive / cs_pool1 real_factor_pool2 = cs_naive / cs_pool2 return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
Вышеупомянутая функция вычисляет наивный размер chunksize (
cs_naive
) и размер фрагмента первого шага алгоритма chunksize-алгоритма Pool (cs_pool1
), а также размер фрагмента для полного алгоритма Pool (cs_pool2
). Далее он вычисляет реальные коэффициентыrf_pool1 = cs_naive / cs_pool1
иrf_pool2 = cs_naive / cs_pool2
, которые говорят нам, во сколько раз наивно рассчитанные размеры фрагментов больше, чем внутренняя версия (и) Pool.Ниже вы видите две фигуры, созданные на основе результатов этой функции. На левом рисунке просто показаны размеры фрагментов
n_workers=4
вплоть до итеративной длины500
. На правом рисунке показаны значения дляrf_pool1
. Для итерируемой длины16
реальный коэффициент становится>=4
(дляlen_iterable >= n_workers * 4
), а его максимальное значение -7
для повторяющейся длины28-31
. Это огромное отклонение от исходного коэффициента4
, к которому алгоритм сходится для более длительных итераций. «Дольше» здесь относительно и зависит от количества указанных рабочих.Помните chunksize
cs_pool1
все еще не хватаетextra
-adjustment с остатком отdivmod
содержащихся вcs_pool2
от полного алгоритма.Алгоритм продолжается:
if extra: chunksize += 1
Теперь в случаях, когда есть остаток (
extra
от операции divmod), увеличение размера фрагмента на 1, очевидно, не сработает для каждой задачи. В конце концов, если бы это было так, для начала не было бы остатка.Как вы можете видеть на рисунке ниже, то « экстра-лечение » имеет эффект, что реальный фактор для
rf_pool2
ныне сходится в направлении4
от ниже4
и отклонение несколько мягче. Стандартное отклонение дляn_workers=4
иlen_iterable=500
падает от0.5233
дляrf_pool1
до0.4115
дляrf_pool2
.В конце концов, увеличение
chunksize
на 1 приводит к тому, что последняя переданная задача имеет размер толькоlen_iterable % chunksize or chunksize
.Тем не менее, более интересный и более значительный эффект дополнительной обработки мы увидим позже по количеству сгенерированных блоков (
n_chunks
). Для достаточно длинных итераций завершенный алгоритм размера фрагментов Pool (n_pool2
на рисунке ниже) будет стабилизировать количество фрагментов вn_chunks == n_workers * 4
. Напротив, наивный алгоритм (после начальной отрыжки) продолжает чередоваться междуn_chunks == n_workers
и поn_chunks == n_workers + 1
мере увеличения длины итерации.Ниже вы найдете две расширенные информационные функции для Pool и наивный алгоритм chunksize. Вывод этих функций понадобится в следующей главе.
# mp_utils.py from collections import namedtuple Chunkinfo = namedtuple( 'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks', 'chunksize', 'last_chunk'] ) def calc_chunksize_info(n_workers, len_iterable, factor=4): """Calculate chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 # `+ (len_iterable % chunksize > 0)` exploits that `True == 1` n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) # exploit `0 == False` last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
Пусть вас не смущает, вероятно, неожиданный вид
calc_naive_chunksize_info
.extra
Отdivmod
не используется для расчета chunksize.def calc_naive_chunksize_info(n_workers, len_iterable): """Calculate naive chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers) if chunksize == 0: chunksize = 1 n_chunks = extra last_chunk = chunksize else: n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
6. Количественная оценка эффективности алгоритма.
Теперь, после того, как мы увидели, как результат
Pool
chunksize-алгоритма выглядит по-другому по сравнению с выводом наивного алгоритма ...Как показано в предыдущей главе, для более длинных итераций (большего числа задач) алгоритм размера фрагментов Pool приблизительно разделяет итерацию на в четыре раза больше фрагментов, чем наивный метод. Меньшие фрагменты означают больше задач, а большее количество задач означает больше накладных расходов на распараллеливание (PO) , стоимость, которую необходимо сопоставить с преимуществом повышенной гибкости планирования (вспомните «Риски размера фрагмента> 1» ).
По довольно очевидным причинам базовый алгоритм размера фрагментов Pool не может сопоставить гибкость планирования с PO для нас. Накладные расходы IPC зависят от ОС, оборудования и размера данных. Алгоритм не может знать, на каком оборудовании мы запускаем наш код, и при этом он не знает, сколько времени потребуется для завершения таскела. Это эвристика, обеспечивающая базовую функциональность для всех возможных сценариев. Это означает, что его нельзя оптимизировать для какого-либо конкретного сценария. Как упоминалось ранее, PO также становится все менее проблемным с увеличением времени вычислений на задачу (отрицательная корреляция).
Когда вы вспоминаете цели распараллеливания из главы 2, вы видите один пункт:
Ранее упомянутое кое-что , что алгоритм размера фрагмента Pool может попытаться улучшить, - это минимизация простаивающих рабочих процессов , соответственно, использование ядер процессора .
Повторяющийся вопрос о SO
multiprocessing.Pool
задают люди, интересующиеся неиспользуемыми ядрами / простаивающими рабочими процессами в ситуациях, когда вы ожидаете, что все рабочие процессы будут заняты. Хотя у этого может быть много причин, простаивающие рабочие процессы ближе к концу вычисления - это наблюдение, которое мы часто можем сделать, даже с плотными сценариями (равное время вычислений на таскел) в тех случаях, когда количество рабочих не является делителем числа. кусков (n_chunks % n_workers > 0
).Теперь вопрос:
6.1 Модели
Для получения более глубокого понимания нам нужна форма абстракции параллельных вычислений, которая упрощает чрезмерно сложную реальность до управляемой степени сложности, сохраняя при этом значимость в определенных границах. Такая абстракция называется моделью . Реализация такой « модели распараллеливания» (PM) генерирует отображаемые работником метаданные (временные метки), как и при реальных вычислениях, если бы данные собирались. Сгенерированные моделью метаданные позволяют прогнозировать показатели параллельных вычислений при определенных ограничениях.
Одной из двух подмоделей в рамках определенного здесь PM является Модель распределения (DM) . DM объясняет , как атомные единицы работы (taskels) распределены по параллельным рабочим и времени , когда нет других факторов , кроме соответствующего chunksize-алгоритма, число рабочих, ввод-итерация (количество taskels) и их продолжительности вычислений не считаются . Это означает, что никакие накладные расходы не включены.
Для получения полной ПМ , то ДЙ расширяются с ВЛИ Model (OM) , представляющий различные формы Распараллеливания Накладного (PO) . Такая модель должна быть откалибрована для каждого узла индивидуально (зависимости от оборудования и ОС). Сколько форм накладных расходов представлено в OM , остается открытым, поэтому могут существовать несколько OM с разной степенью сложности. Какой уровень точности требуется внедренному OM, определяется общим весом PO для конкретных вычислений. Более короткие задачи приводят к большему весу PO , что, в свою очередь, требует более точного, OM.если бы мы пытались предсказать эффективность распараллеливания (PE) .
6.2 Параллельное расписание (PS)
Параллельный график представляет собой двумерное представление параллельных вычислений, где ось х представляет время и ось у представляет собой пул параллельных рабочих. Количество рабочих и общее время вычислений обозначают протяженность прямоугольника, в котором нарисованы меньшие прямоугольники. Эти меньшие прямоугольники представляют собой элементарные единицы работы (таскелы).
Ниже вы найдете визуализацию PS, нарисованную с использованием данных из алгоритма chunksize DM of Pool для плотного сценария .
Названия составных частей можно увидеть на картинке ниже.
В полном PM, включающем OM , доля холостого хода не ограничивается хвостовой частью, но также включает пространство между задачами и даже между элементами задач.
6.3 Эффективность
Представленные выше модели позволяют количественно оценить коэффициент использования рабочих. Мы можем выделить:
Важно отметить, что вычисленная эффективность не коррелирует автоматически с более быстрыми общими вычислениями для данной проблемы распараллеливания. Использование рабочих в этом контексте различает только работника, имеющего начатую, но еще не завершенную задачу, и рабочего, у которого нет такой «открытой» панели задач. Это означает, что возможен холостой ход течение периода времени таскела не регистрируется.
Все вышеупомянутые значения эффективности в основном получены путем вычисления частного деления доли занятости / параллельного расписания . Разница между DE и PE заключается в том, что доля занятости занимает меньшую часть общего параллельного расписания для PM с расширенными накладными расходами .
В этом ответе будет обсуждаться только простой метод вычисления DE. для плотного сценария. Этого достаточно для сравнения различных алгоритмов размера фрагментов, поскольку ...
6.3.1 Абсолютная эффективность распределения (ADE)
Эту базовую эффективность в целом можно рассчитать, разделив долю занятости на весь потенциал параллельного расписания :
Для плотного сценария упрощенный расчетный код выглядит так:
# mp_utils.py def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Absolute Distribution Efficiency (ADE). `len_iterable` is not used, but contained to keep a consistent signature with `calc_rde`. """ if n_workers == 1: return 1 potential = ( ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize) + (n_chunks % n_workers == 1) * last_chunk ) * n_workers n_full_chunks = n_chunks - (chunksize > last_chunk) taskels_in_regular_chunks = n_full_chunks * chunksize real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk ade = real / potential return ade
Если нет Холостой Share , Busy доля будет равна к Parallel Schedule , следовательно , мы получаем ADE 100%. В нашей упрощенной модели это сценарий, при котором все доступные процессы будут заняты все время, необходимое для обработки всех задач. Другими словами, вся работа эффективно распараллеливается на 100 процентов.
Но почему я держу в виде PE в качестве абсолютного PE здесь?
Чтобы понять это, мы должны рассмотреть возможный случай для chunksize (cs), который обеспечивает максимальную гибкость планирования (также, количество горцев может быть. Совпадение?):
Если у нас, например, четыре рабочих процесса и 37 задач, будут простаивающие рабочие даже с
chunksize=1
, просто потому, чтоn_workers=4
это не делитель 37. Остаток от деления 37/4 равен 1. Этот единственный оставшийся таскел должен быть обрабатывается единственным работником, а остальные три простаивают.Точно так же останется один неработающий рабочий с 39 таскелями, как вы можете видеть на картинке ниже.
Когда вы сравните верхнее параллельное расписание для
chunksize=1
с версией ниже дляchunksize=3
, вы заметите, что верхнее параллельное расписание меньше, а временная шкала на оси x короче. Теперь должно стать очевидным, насколько неожиданно большие куски могут привести к увеличению общего времени вычислений, даже для плотных сценариев .Потому что накладные расходы в этой модели не содержатся. Он будет отличаться для обоих размеров фрагментов, поэтому ось x на самом деле напрямую не сопоставима. Накладные расходы могут привести к увеличению общего времени вычислений, как показано в случае 2 на рисунке ниже.
6.3.2 Относительная эффективность распределения (RDE)
Значение ADE не содержит информации, если возможно лучшее распределение таскелей с chunksize, установленным на 1. Лучше здесь все равно будет меньше долю холостого хода .
Чтобы получить значение DE, скорректированное с учетом максимально возможного DE , мы должны разделить рассматриваемое ADE на ADE, которое мы получаем
chunksize=1
.Вот как это выглядит в коде:
# mp_utils.py def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Relative Distribution Efficiency (RDE).""" ade_cs1 = calc_ade( n_workers, len_iterable, n_chunks=len_iterable, chunksize=1, last_chunk=1 ) ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk) rde = ade / ade_cs1 return rde
RDE , как здесь определено, по сути, является рассказом о хвосте параллельного расписания . RDE зависит от максимального эффективного размера фрагмента, содержащегося в хвосте. (Этот хвост может иметь длину по оси X
chunksize
илиlast_chunk
.) Следствием этого является то, что RDE естественным образом сходится к 100% (даже) для всех видов «хвостовых образов», как показано на рисунке ниже.Низкий RDE ...
Вы можете найти часть II этого ответа здесь .
источник
7. Наивный алгоритм против алгоритма пула.
Прежде чем вдаваться в подробности, рассмотрим две гифки ниже. Для диапазона разной
iterable
длины они показывают, как два сравниваемых алгоритма разбивают переданный фрагментiterable
(к тому времени это будет последовательность) и как могут быть распределены результирующие задачи. Порядок рабочих является случайным, и количество распределенных задач на одного работника в действительности может отличаться от этого изображения для легких задач и / или задач в широком сценарии. Как упоминалось ранее, сюда не включаются накладные расходы. Однако для достаточно тяжелых задач в плотном сценарии с незначительными размерами передаваемых данных реальные вычисления рисуют очень похожую картину.Как показано в главе « 5. Алгоритм размера фрагментов пула », с алгоритмом размера фрагментов пула количество фрагментов будет стабилизироваться на уровне
n_chunks == n_workers * 4
достаточно больших итераций, в то время как он продолжает переключаться междуn_chunks == n_workers
иn_chunks == n_workers + 1
с наивным подходом. Применяется наивный алгоритм: посколькуn_chunks % n_workers == 1
isTrue
forn_chunks == n_workers + 1
, будет создан новый раздел, в котором будет задействован только один рабочий.Ниже вы видите рисунок, аналогичный показанному в главе 5, но на котором отображается количество разделов, а не количество фрагментов. Для полного chunksize-алгоритма Pool (
n_pool2
)n_sections
будет стабилизироваться печально известный жестко запрограммированный коэффициент4
. Для наивного алгоритмаn_sections
будет чередоваться от одного до двух.Для алгоритма размера фрагмента Pool стабилизация с
n_chunks = n_workers * 4
помощью вышеупомянутой дополнительной обработки предотвращает создание здесь нового раздела и сохраняет долю холостого хода ограниченным одним рабочим для достаточно долгих итераций. Более того, алгоритм будет продолжать сокращать относительный размер доли холостого хода , что приводит к приближению значения RDE к 100%.«Достаточно долго» для
n_workers=4
этоlen_iterable=210
, например. Для итераций, равных или превышающих это значение, доля холостого хода будет ограничена одним4
воркером , черта, изначально утраченная из-за умножения в алгоритме размера фрагмента в первую очередь.Наивный алгоритм chunksize также сходится к 100%, но делает это медленнее. Эффект конвергенции зависит исключительно от того факта, что относительная часть хвоста сжимается в случаях, когда будет две секции. Этот хвост только с одним нанятым работником ограничен длиной по оси x
n_workers - 1
, максимально возможным остатком дляlen_iterable / n_workers
.Ниже вы найдете две тепловые карты, на которых показаны значения RDE для всех повторяемых длин до 5000, для всех чисел рабочих от 2 до 100. Цветовая шкала изменяется от 0,5 до 1 (50% -100%). Вы заметите гораздо больше темных областей (более низкие значения RDE) для наивного алгоритма на левой тепловой карте. Напротив, алгоритм размера фрагментов Пула справа рисует гораздо более солнечную картину.
Диагональный градиент темных углов в нижнем левом углу по сравнению с яркими углами в правом верхнем углу снова показывает зависимость от количества рабочих для того, что называется «длинной итерацией».
С алгоритмом размера фрагмента Pool значение RDE 81,25% является наименьшим значением для диапазона рабочих и повторяемых длин, указанных выше:
С наивным алгоритмом chunksize дела могут пойти намного хуже. Самый низкий расчетный RDE здесь составляет 50,72%. В этом случае почти половину времени вычислений работает только один рабочий! Итак, берегитесь, счастливые обладатели Knights Landing . ;)
8. Проверка реальности
В предыдущих главах мы рассматривали упрощенную модель чисто математической задачи распределения, лишенную мельчайших деталей, которые в первую очередь делают многопроцессорную обработку такой сложной темой. Чтобы лучше понять , как далеко распространения модели (DM) в одиночку может способствовать объяснению наблюдаемого использования рабочих в реальности, теперь мы рассмотрим параллельные расписания, составленные с помощью реальных вычислений.
Настроить
Все следующие графики связаны с параллельным выполнением простой фиктивной функции с привязкой к процессору, которая вызывается с различными аргументами, поэтому мы можем наблюдать, как нарисованное параллельное расписание изменяется в зависимости от входных значений. «Работа» в этой функции состоит только из итерации по объекту диапазона. Этого уже достаточно для того, чтобы ядро было занято, поскольку мы передаем в него огромные числа. По желанию функция принимает некоторую уникальную для таскеля добавку,
data
которая просто возвращается без изменений. Поскольку каждый таскел включает в себя один и тот же объем работы, мы по-прежнему имеем дело с плотным сценарием.Функция украшена оберткой, принимающей временные метки с разрешением ns (Python 3.7+). Отметки времени используются для расчета временного интервала таскела и, следовательно, позволяют рисовать эмпирическое параллельное расписание.
@stamp_taskel def busy_foo(i, it, data=None): """Dummy function for CPU-bound work.""" for _ in range(int(it)): pass return i, data def stamp_taskel(func): """Decorator for taking timestamps on start and end of decorated function execution. """ @wraps(func) def wrapper(*args, **kwargs): start_time = time_ns() result = func(*args, **kwargs) end_time = time_ns() return (current_process().name, (start_time, end_time)), result return wrapper
Метод карты звезды Pool также оформлен таким образом, что рассчитывается только сам вызов карты звезды. «Начало» и «конец» этого вызова определяют минимум и максимум по оси x созданного параллельного расписания.
Мы собираемся наблюдать вычисление 40 таскелей на четырех рабочих процессах на машине со следующими характеристиками: Python 3.7.1, Ubuntu 18.04.2, Intel® Core ™ i7-2600K CPU @ 3.40GHz × 8
Входные значения, которые будут варьироваться, - это количество итераций в цикле for (30k, 30M, 600M) и размер дополнительно отправляемых данных (для каждого таскела, numpy-ndarray: 0 MiB, 50 MiB).
... N_WORKERS = 4 LEN_ITERABLE = 40 ITERATIONS = 30e3 # 30e6, 600e6 DATA_MiB = 0 # 50 iterable = [ # extra created data per taskel (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8))) # taskel args for i in range(LEN_ITERABLE) ] with Pool(N_WORKERS) as pool: results = pool.starmap(busy_foo, iterable)
Показанные ниже прогоны были отобраны для того, чтобы иметь одинаковый порядок чанков, чтобы вы могли лучше заметить различия по сравнению с параллельным расписанием из модели распределения, но не забывайте, что порядок, в котором рабочие получают свою задачу, недетерминирован.
Прогнозирование DM
Повторюсь, модель распределения «предсказывает» параллельное расписание, как мы уже видели это ранее в главе 6.2:
1-й ЗАПУСК: 30 тыс. Итераций и 0 МБ данных на каждую задачу
Наш первый запуск здесь очень короткий, таскели очень «легкие». Весь
pool.starmap()
вызов занял всего 14,5 мс. Вы заметите, что в отличие от DM , холостой ход не ограничивается хвостовой частью, но также имеет место между задачами и даже между задачами. Это потому, что наш реальный график, естественно, включает в себя всевозможные накладные расходы. Холостой ход здесь означает все, что не входит в таскель. Возможный реальный холостой ход во время таскела не фиксируется, как уже упоминалось ранее.Кроме того, вы можете видеть, что не все работники выполняют свои задачи одновременно. Это связано с тем, что все воркеры питаются через общий ресурс,
inqueue
и только один воркер может читать с него одновременно. То же самое касаетсяoutqueue
. Это может вызвать большие неприятности, если вы передаете нематериальные объемы данных, как мы увидим позже.Более того, вы можете видеть, что, несмотря на то, что каждый таскел включает в себя одинаковый объем работы, фактический измеренный временной интервал для таскела сильно различается. Задачи, распределенные для worker-3 и worker-4, требуют больше времени, чем те, которые обрабатываются первыми двумя рабочими. Для этого прогона я подозреваю, что это связано с тем, что в тот момент турбо-ускорение на ядрах для worker-3/4 больше не было, поэтому они обрабатывали свои задачи с более низкой тактовой частотой.
Все вычисления настолько легкие, что факторы хаоса, вносимые оборудованием или ОС, могут сильно исказить PS . Вычисления - это «лист на ветру», и предсказание DM не имеет большого значения даже для теоретически подходящего сценария.
2-й ЗАПУСК: 30 млн итераций и 0 МБ данных на каждую задачу
Увеличение количества итераций в цикле for с 30 000 до 30 миллионов приводит к реальному параллельному расписанию, которое близко к идеальному совпадению с предсказанным данными, предоставленными DM , ура! Вычисления для каждого таскела теперь достаточно тяжелы, чтобы маргинализировать холостые части в начале и между ними, позволяя видеть только большую долю холостого хода, которую предсказал DM .
3-й ЗАПУСК: 30 млн итераций и 50 МБ данных на каждую задачу
Сохранение 30 миллионов итераций, но дополнительная отправка 50 МБ на таскель вперед и назад снова искажает картину. Здесь хорошо виден эффект очереди. Рабочий-4 должен ждать своей второй задачи дольше, чем Рабочий-1. А теперь представьте этот график с 70 рабочими!
В случае, если таскели очень легкие с вычислительной точки зрения, но предоставляют заметный объем данных в качестве полезной нагрузки, узкое место одной общей очереди может помешать каким-либо дополнительным преимуществам добавления большего количества рабочих в пул, даже если они поддерживаются физическими ядрами. В таком случае Worker-1 может выполнить свою первую задачу и ожидать новую даже до того, как Worker-40 получит свою первую задачу.
Теперь должно стать очевидным, почему время вычислений
Pool
не всегда уменьшается линейно с количеством рабочих. Отправка относительно больших объемов данных может привести к сценариям, в которых большую часть времени тратится на ожидание копирования данных в адресное пространство рабочего, и одновременно может быть загружен только один рабочий.4-й ЗАПУСК: 600 млн итераций и 50 МБ данных на каждую задачу
Здесь мы снова отправляем 50 МБ, но увеличиваем количество итераций с 30 до 600 МБ, что увеличивает общее время вычислений с 10 до 152 с. Нарисованное параллельное расписание снова близко к идеальному совпадению с прогнозируемым, накладные расходы на копирование данных минимизированы.
9. Заключение
Обсуждаемое умножение на
4
увеличивает гибкость планирования, но также усиливает неравномерность распределений таскелей. Без этого умножения доля холостого хода была бы ограничена одним рабочим даже для коротких итераций (для DM с плотным сценарием). Алгоритм размера фрагментов Pool требует, чтобы итерации ввода были определенного размера, чтобы восстановить эту черту.Как мы надеемся, этот ответ показал, что алгоритм размера фрагмента Pool приводит к лучшему использованию ядра в среднем по сравнению с наивным подходом, по крайней мере, для среднего случая и без учета длительных накладных расходов. Наивный алгоритм здесь может иметь эффективность распределения (DE) всего ~ 51%, в то время как алгоритм размера фрагмента Pool имеет низкий уровень ~ 81%. Однако DE не содержит накладных расходов на распараллеливание (PO), как IPC. Глава 8 показала, что DE все еще может иметь большую предсказательную силу для плотного сценария с минимальными накладными расходами.
Несмотря на то, что алгоритм размера фрагментов Пула обеспечивает более высокое значение DE по сравнению с наивным подходом, он не обеспечивает оптимального распределения таскелей для каждого входного созвездия. Хотя простой статический алгоритм фрагментации не может оптимизировать (включая накладные расходы) эффективность распараллеливания (PE), нет никакой внутренней причины, по которой он не всегда мог бы обеспечить относительную эффективность распределения (RDE) 100%, то есть такую же DE, что и с
chunksize=1
. Простой алгоритм chunksize состоит только из базовой математики и может «разрезать торт» любым способом.В отличие от реализации в Pool алгоритма «порции равного размера», алгоритм «порции равного размера» обеспечит RDE 100% для каждой комбинации
len_iterable
/n_workers
. Алгоритм разделения на части равного размера было бы немного сложнее реализовать в исходном коде Pool, но его можно модулировать поверх существующего алгоритма, просто упаковывая задачи извне (я буду ссылаться здесь на случай, если я брошу вопрос / ответ на как это сделать).источник
Я думаю, что отчасти вам не хватает того, что ваша наивная оценка предполагает, что каждая единица работы занимает одинаковое количество времени, и в этом случае ваша стратегия будет наилучшей. Но если некоторые задания завершаются раньше, чем другие, то некоторые ядра могут бездействовать, ожидая завершения медленных заданий.
Таким образом, если разбить фрагменты на в 4 раза больше частей, то, если один фрагмент завершился раньше, это ядро может начать следующий фрагмент (в то время как другие ядра продолжают работать над своим более медленным фрагментом).
Я не знаю, почему они выбрали коэффициент 4 точно, но это будет компромисс между минимизацией накладных расходов кода карты (который требует максимально возможных кусков) и балансировкой кусков, занимающих разное количество раз (что требует наименьшего возможного фрагмента ).
источник