Как настроить номер исполнителя искры, количество ядер и память исполнителя?

84

Где вы начинаете настраивать вышеупомянутые параметры. Начнем ли мы с памяти исполнителя и получим количество исполнителей, или мы начнем с ядер и получим номер исполнителя. Я перешел по ссылке . Однако получил представление на высоком уровне, но все еще не уверен, как и с чего начать и прийти к окончательному выводу.

Рамзи
источник

Ответы:

209

Следующий ответ охватывает 3 основных аспекта, упомянутых в заголовке: количество исполнителей, память исполнителя и количество ядер. Могут быть другие параметры, такие как память драйвера и другие, которые я не рассматривал в этом ответе, но хотел бы добавить в ближайшем будущем.

Вариант 1 Оборудование - 6 узлов, каждый узел 16 ядер, 64 ГБ ОЗУ

Каждый исполнитель - это экземпляр JVM. Таким образом, у нас может быть несколько исполнителей в одном узле

Сначала для ОС и демонов Hadoop требуется 1 ядро ​​и 1 ГБ, поэтому доступно 15 ядер, 63 ГБ ОЗУ для каждого узла.

Начнем с того, как выбрать количество ядер :

Number of cores = Concurrent tasks as executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

Количество исполнителей:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

Память для каждого исполнителя:

From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. 

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB

Конечные цифры - Исполнители - 17, Ядра 5, Оперативная память исполнителя - 19 ГБ


Аппаратное обеспечение случая 2: те же 6 узлов, 32 ядра, 64 ГБ

5 то же самое для хорошего параллелизма

Количество исполнителей на каждый узел = 32/5 ~ 6

Таким образом, общее количество исполнителей = 6 * 6 узлов = 36. Тогда окончательное число 36 - 1 для AM = 35.

Память исполнителя составляет: 6 исполнителей на каждый узел. 63/6 ~ 10. Над головой 0,07 * 10 = 700 МБ. Таким образом, округляя до 1 ГБ, мы получаем 10-1 = 9 ГБ.

Итоговые цифры - Исполнители - 35, Количество ядер 5, Оперативная память исполнителя - 9 ГБ


Случай 3

Вышеупомянутые сценарии начинаются с принятия количества ядер как фиксированного и перехода к количеству исполнителей и памяти.

Теперь для первого случая, если мы думаем, что нам не нужно 19 ГБ, а достаточно всего 10 ГБ, то следующие числа:

ядер 5 # исполнителей для каждого узла = 3

На данном этапе это приведет к 21, а затем 19 согласно нашему первому расчету. Но поскольку мы думали, что 10 - это нормально (предполагая небольшие накладные расходы), то мы не можем переключить количество исполнителей на узел на 6 (например, 63/10). Если у нас есть 6 исполнителей на узел и 5 ядер, получается 30 ядер на узел, когда у нас всего 16 ядер. Поэтому нам также нужно изменить количество ядер для каждого исполнителя.

Итак, рассчитывая снова,

Магическое число 5 превращается в 3 (любое число, меньшее или равное 5). Итак, с 3 ядрами и 15 доступными ядрами мы получаем 5 исполнителей на узел. Итак (5 * 6-1) = 29 исполнителей

Таким образом, объем памяти составляет 63/5 ~ 12. Накладной объем составляет 12 * 0,07 = 0,84. Таким образом, память исполнителя составляет 12 - 1 ГБ = 11 ГБ.

Окончательные номера - 29 исполнителей, 3 ядра, память исполнителя - 11 ГБ.


Динамическое размещение:

Примечание. Верхняя граница количества исполнителей, если включено динамическое размещение. Это говорит о том, что приложение Spark при необходимости может съесть все ресурсы. Итак, в кластере, где у вас работают другие приложения, которым также нужны ядра для выполнения задач, убедитесь, что вы делаете это на уровне кластера. Я имею в виду, что вы можете выделить определенное количество ядер для YARN в зависимости от доступа пользователя. Таким образом, вы можете создать spark_user, а затем указать ядра (мин / макс) для этого пользователя. Эти ограничения предназначены для обмена между Spark и другими приложениями, работающими на YARN.

spark.dynamicAllocation.enabled - Если установлено значение true - нам не нужно упоминать исполнителей. Причина ниже:

Число статических параметров, которое мы указываем в spark-submit, рассчитано на всю продолжительность задания. Однако, если динамическое размещение входит в картину, будут разные этапы, такие как

С чего начать:

Начальное количество исполнителей ( spark.dynamicAllocation.initialExecutors ) для начала

Сколько :

Затем в зависимости от нагрузки (ожидающих задач), сколько запросить. В конечном итоге это будут числа, которые мы даем при отправке искры статическим способом. Итак, как только начальные номера исполнителей установлены, мы переходим к значениям min ( spark.dynamicAllocation.minExecutors ) и max ( spark.dynamicAllocation.maxExecutors ).

Когда спросить или дать:

Когда мы запрашиваем новых исполнителей ( spark.dynamicAllocation.schedulerBacklogTimeout ) - в течение такого длительного времени были отложенные задачи. так просьба. количество исполнителей, запрашиваемых в каждом раунде, увеличивается экспоненциально по сравнению с предыдущим раундом. Например, приложение добавит 1 исполнителя в первом раунде, а затем 2, 4, 8 и так далее исполнителей в последующих раундах. В определенный момент на картинке появляется указанный выше максимум.

когда мы отдаем исполнителя ( spark.dynamicAllocation.executorIdleTimeout ) -

Пожалуйста, поправьте меня, если я что-то пропустил. Вышесказанное является моим пониманием на основе блога, которым я поделился в вопросе, и некоторых онлайн-ресурсов. Спасибо.

Рекомендации:

Рамзи
источник
2
Я где-то читал, что есть только один исполнитель на узел в автономном режиме, есть идеи по этому поводу? Я не вижу этого в вашем ответе.
jangorecki
2
В автономном кластере по умолчанию у нас есть один исполнитель на каждого работника. Нам нужно поиграть с spark.executor.cores, и у рабочего достаточно ядер, чтобы получить более одного исполнителя.
Ramzy
Я обнаружил, что мой рабочий использует все 32 ядра без настройки spark.executor.cores, поэтому он может использовать все доступные по умолчанию.
jangorecki
Ага, по умолчанию для ядер как говорится бесконечно. Таким образом, Spark может использовать все доступные ядра, если вы не укажете.
Ramzy
2
@Ramzy Я думаю, следует отметить, что даже при динамическом распределении вы все равно должны указывать spark.executor.cores, чтобы определить размер каждого исполнителя, который Spark собирается выделить. В противном случае, когда Spark собирается выделить вашему приложению нового исполнителя, он будет выделять весь узел (если он доступен), даже если вам нужно всего лишь пять дополнительных ядер.
Дэн Мархасин
6

Кроме того, это зависит от вашего варианта использования, важным параметром конфигурации является:

spark.memory.fraction(Часть (пространство кучи - 300 МБ), используемая для выполнения и хранения) из http://spark.apache.org/docs/latest/configuration.html#memory-management .

Если вы не используете cache / persist, установите значение 0.1, чтобы у вас была вся память для вашей программы.

Если вы используете cache / persist, вы можете проверить занятую память:

sc.getExecutorMemoryStatus.map(a => (a._2._1 - a._2._2)/(1024.0*1024*1024)).sum

Вы читаете данные из HDFS или из HTTP?

Опять же, настройка зависит от вашего варианта использования.

Томас Деко
источник
Вы знаете, как будет выглядеть команда карты при использовании pyspark? Я использую sc._jsc.sc (). GetExecutorMemoryStatus (), чтобы получить статус исполнителя, но ничего не могу сделать с тем, что он возвращает ...
Томас
1
Извините, @Thomas Decaux , но вы имели в виду настройку spark.memory.storageFraction=0.1? Поскольку AFAIK spark.memory.fractionопределяет объем памяти, используемый как Sparkдля выполнения, так и для хранения (вы уже упоминали об этом), и только spark.memory.storageFractionпамять (которая доступна для хранения + выполнения) защищена от вытеснения кеша . Пожалуйста, посмотрите эту ссылку
y2k-shubham
@Thomas Если в моем приложении используется только постоянный (StorageLevel.DISK_ONLY), то этот параметр также применим, верно? Это влияет только на долю памяти, но не влияет на сброс диска?
jk1