Я пытаюсь понять взаимосвязь количества ядер и количества исполнителей при запуске задания Spark на YARN.
Тестовая среда выглядит следующим образом:
- Количество узлов данных: 3
- Спецификация машины узла данных:
- Процессор: Core i7-4790 (количество ядер: 4, количество потоков: 8)
- Оперативная память: 32 ГБ (8 ГБ х 4)
- HDD: 8 ТБ (2 ТБ х 4)
Сеть: 1 Гб
Версия Spark: 1.0.0
Версия Hadoop: 2.4.0 (Hortonworks HDP 2.1)
Spark поток работ: sc.textFile -> фильтр -> карта -> фильтр -> mapToPair -> ReduceByKey -> карта -> saveAsTextFile
Входные данные
- Тип: один текстовый файл
- Размер: 165 ГБ
- Количество строк: 454 568 833
Вывод
- Количество строк после второго фильтра: 310 640 717
- Количество строк в файле результатов: 99,848,268
- Размер файла результата: 41 ГБ
Задание было запущено со следующими конфигурациями:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
(исполнители на узел данных, используйте столько же, сколько ядер)--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(количество ядер уменьшено)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(меньше ядра, больше исполнителя)
Истекшие времена:
50 мин 15 сек
55 мин 48 сек
31 мин 23 сек
К моему удивлению, (3) было намного быстрее.
Я думал, что (1) будет быстрее, так как при перетасовке будет меньше общения между исполнителями.
Хотя количество ядер в (1) меньше, чем (3), количество ядер не является ключевым фактором, так как 2) работает хорошо.
(Подписки добавлены после ответа pwilmot.)
Для информации, снимок экрана монитора производительности выглядит следующим образом:
- Сводка узла данных Ganglia для (1) - задание началось в 04:37.
- Сводка узла данных Ganglia для (3) - задание началось в 19:47. Пожалуйста, игнорируйте график до этого времени.
График примерно делится на 2 раздела:
- Во-первых: от начала до уменьшенияByKey: загрузка процессора, нет активности сети
- Второе: после lowerByKey: ЦП понижается, сетевой ввод / вывод завершен.
Как видно из графика, (1) может использовать столько мощности процессора, сколько было указано. Таким образом, это может быть не проблема количества потоков.
Как объяснить этот результат?
источник
Ответы:
Объяснение было дано в статье в блоге Cloudera, How-to: Tune Your Apache Spark Jobs (Часть 2) .
источник
yarn.scheduler.capacity.resource-calculator
отключенным, что по умолчанию. Это связано с тем, что по умолчанию он распределяется по памяти, а не по процессору.По словам Сэнди Риза, когда вы запускаете свое искровое приложение поверх HDFS
Поэтому я считаю, что ваша первая конфигурация медленнее третьей из-за плохой пропускной способности ввода-вывода HDFS
источник
Я сам не играл с этими настройками, так что это всего лишь предположение, но если мы будем рассматривать эту проблему как обычные ядра и потоки в распределенной системе, то в вашем кластере вы можете использовать до 12 ядер (4 * 3 компьютера) и 24 потока (8 * 3 машины). В первых двух примерах вы предоставляете своей работе достаточное количество ядер (потенциальное вычислительное пространство), но количество потоков (заданий), выполняемых на этих ядрах, настолько ограничено, что вы не можете использовать большую часть выделенной вычислительной мощности. и, следовательно, работа медленнее, даже если выделено больше вычислительных ресурсов.
Вы упоминаете, что ваша задача была в шаге случайного перемешивания - хотя приятно ограничить накладные расходы в шаге случайного воспроизведения, как правило, гораздо важнее использовать распараллеливание кластера. Подумайте о крайнем случае - однопоточная программа с нулевым перемешиванием.
источник
Я думаю, что ответ здесь может быть немного проще, чем некоторые из рекомендаций здесь.
Ключ для меня в графе кластерной сети. Для прогона 1 загрузка устойчива на уровне ~ 50 М байт / с. Для прогона 3 устойчивое использование удваивается, около 100 Мбайт / с.
От поста Cloudera блога разделяемой DzOrd , вы можете увидеть эту важную цитату:
Итак, давайте сделаем несколько расчетов, чтобы увидеть, какую производительность мы ожидаем, если это правда.
Запуск 1: 19 ГБ, 7 ядер, 3 исполнителя
Запуск 3: 4 ГБ, 2 ядра, 12 исполнителей
Если задание на 100% ограничено параллелизмом (количеством потоков). Мы ожидаем, что время выполнения будет полностью обратно коррелировано с количеством потоков.
Итак
ratio_num_threads ~= inv_ratio_runtime
, похоже, что мы ограничены в сети.Этот же эффект объясняет разницу между прогоном 1 и прогоном 2.
Запуск 2: 19 ГБ, 4 ядра, 3 исполнителя
Сравнение количества эффективных потоков и времени выполнения:
Он не так совершенен, как в предыдущем сравнении, но мы все еще видим аналогичное падение производительности, когда теряем потоки.
Теперь напоследок: почему мы получаем лучшую производительность с большим количеством потоков, особенно? больше потоков, чем количество процессоров?
Хорошее объяснение разницы между параллелизмом (что мы получаем путем разделения данных на несколько процессоров) и параллелизмом (что мы получаем, когда мы используем несколько потоков для работы на одном процессоре), представлено в этом замечательном посте Роба Пайка: « Параллельность» это не параллелизм .
Краткое объяснение состоит в том, что если задание Spark взаимодействует с файловой системой или сетью, ЦП тратит много времени на ожидание связи с этими интерфейсами и не тратит много времени на «выполнение работы». Предоставляя этим процессорам более одной задачи одновременно, они тратят меньше времени на ожидание и больше времени на работу, и вы видите лучшую производительность.
источник
Из превосходных ресурсов, доступных на странице пакета Sparklyr RStudio :
источник
Динамическое распределение Spark обеспечивает гибкость и динамическое распределение ресурсов. В этом количестве может быть указано минимальное и максимальное количество исполнителей. Также может быть указано количество исполнителей, которое должно быть запущено при запуске приложения.
Читайте ниже о том же:
источник
Я думаю, что в первых двух конфигурациях есть небольшая проблема. Понятия потоков и сердечников, как следует. Концепция потоков - если ядра идеальны, используйте это ядро для обработки данных. Таким образом, память используется не полностью в первых двух случаях. Если вы хотите провести тестирование в этом примере, выберите машины, которые имеют более 10 ядер на каждой машине. Затем сделайте отметку.
Но не отдавайте более 5 ядер на каждого исполнителя, это приведет к снижению производительности ввода-вывода.
Таким образом, лучшими машинами для этого тестирования могут быть узлы данных, имеющие 10 ядер.
Спецификация узла данных: ЦП: Core i7-4790 (количество ядер: 10, количество потоков: 20) ОЗУ: 32 ГБ (8 ГБ х 4), жесткий диск: 8 ТБ (2 ТБ х 4)
источник
Я думаю, что одной из основных причин является местность. Размер вашего входного файла составляет 165 ГБ, связанные с файлом блоки, безусловно, распределены по нескольким узлам данных, и больше исполнителей могут избежать копирования по сети.
Попробуйте установить количество исполнителей равным числом блоков, я думаю, может быть быстрее.
источник