Как этапы делятся на задачи в Spark?

149

Предположим, что в каждый момент времени выполняется только одно задание Spark.

Что я получил до сих пор

Вот что я понимаю в Spark:

  1. Когда SparkContextсоздается, каждый рабочий узел запускает исполнителя. Исполнители - это отдельные процессы (JVM), которые подключаются к программе драйвера. У каждого исполнителя есть баночка с программой драйвера. Выходя из драйвера, выключает исполнители. Каждый исполнитель может содержать несколько разделов.
  2. Когда задание выполняется, создается план выполнения в соответствии с графом происхождения.
  3. Задание выполнения разделено на этапы, где этапы содержат столько же соседних (в линейном графе) преобразований и действий, но не перемешиваются. Таким образом этапы разделяются перетасовкой.

изображение 1

Я это понимаю

  • Задача - это команда, отправляемая драйвером исполнителю путем сериализации объекта Function.
  • Исполнитель десериализует (с помощью jar-файла драйвера) команду (задачу) и выполняет ее на разделе.

но

Вопросы)

Как разделить этап на эти задачи?

В частности:

  1. Определяются ли задачи преобразованиями и действиями, или в задаче может быть несколько преобразований / действий?
  2. Определяются ли задачи разделом (например, одна задача на этап на раздел).
  3. Определяются ли задачи узлами (например, одна задача на этап на узел)?

Что я думаю (только частичный ответ, даже если прав)

В https://0x0fff.com/spark-architecture-shuffle перемешивание объясняется изображением

введите описание изображения здесь

и у меня сложилось впечатление, что это правило

каждый этап разбит на # задач по количеству разделов, без учета количества узлов

Для моего первого изображения я бы сказал, что у меня есть 3 задачи карты и 3 задачи сокращения.

Для изображения из 0x0fff я бы сказал, что есть 8 задач карты и 3 задачи сокращения (при условии, что есть только три оранжевых и три темно-зеленых файла).

Открытые вопросы в любом случае

Это верно? Но даже если это правильно, ответы на мои вопросы выше не получены, потому что он все еще открыт, независимо от того, выполняются ли несколько операций (например, несколько карт) в рамках одной задачи или разделены на одну задачу для каждой операции.

Что говорят другие

Что такое задача в Spark? Как рабочий Spark выполняет файл jar? и как планировщик Apache Spark разбивает файлы на задачи? похожи, но я не чувствовал, что там был дан четкий ответ на мой вопрос.

Марка 42
источник
Был бы признателен, если бы вы могли добавить больше идей, у меня были похожие вопросы.
Наг
@Nag: Мой вопрос также был связан с поиском дополнительной информации, поэтому я спросил :-). Обеспечили ли ответы то, что вы искали? Какие идеи вы просите?
Make42
ах, понял. Я подумал, так как этот вопрос был опубликован немного старовато, и, возможно, вы получили бы некоторое представление о заданных вами вопросах. думал уточнить у вас :-)
Наг
@Nag: Ну, прошло пару лет с тех пор, как я в последний раз работал со Spark, так что а) мне пришлось бы снова прочитать Spark, если бы я хотел знать, как это работает (я забыл большую часть деталей) и б) то, что я написал, может быть устаревшим, особенно потому, что мой пост в основном относился к Spark 1.x, а в Spark 2.x было много изменений, черт возьми. Но, возможно, изменения не касались внутренней архитектуры - это тоже может быть правдой.
Make42
большой. Благодарность !!
Наг,

Ответы:

53

У вас здесь довольно красивый план. Чтобы ответить на ваши вопросы

  • Отдельной task же нужно быть запущен для каждого раздела данных для каждого stage. Учтите, что каждый раздел, скорее всего, будет находиться в разных физических местоположениях - например, в блоках HDFS или в каталогах / томах локальной файловой системы.

Обратите внимание, что отправка Stages обусловлена DAG Scheduler. Это означает, что этапы, которые не являются взаимозависимыми, могут быть отправлены в кластер для параллельного выполнения: это максимизирует возможность распараллеливания в кластере. Поэтому, если операции в нашем потоке данных могут происходить одновременно, мы ожидаем увидеть запуск нескольких этапов.

Мы можем увидеть это в действии в следующем игрушечном примере, в котором мы выполняем следующие типы операций:

  • загрузить два источника данных
  • выполнить некоторую операцию карты для обоих источников данных отдельно
  • Присоединяйся к ним
  • выполнить некоторые операции сопоставления и фильтрации результата
  • сохранить результат

Так сколько же стадий мы получим в итоге?

  • По 1 этапу для параллельной загрузки двух источников данных = 2 этапа
  • Третий этап, представляющий то, joinчто зависит от двух других этапов
  • Примечание: все последующие операции, работающие с объединенными данными, могут выполняться на одном этапе, поскольку они должны происходить последовательно. Нет смысла запускать дополнительные этапы, потому что они не могут начать работу, пока не будет завершена предыдущая операция.

Вот эта игрушечная программа

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

И вот DAG результата

введите описание изображения здесь

Теперь: сколько задач ? Количество задач должно быть равно

Сумма ( Stage* #Partitions in the stage)

Джавадба
источник
2
Благодарность! Пожалуйста, дайте более подробный ответ относительно моего текста: 1) Не является ли мое определение стадий исчерпывающим? Похоже, я пропустил требование, что этап не может содержать операции, которые могут быть параллельны. Или мое описание уже подразумевает это? 2) Количество задач, которые должны быть выполнены для задания, определяется количеством разделов, но не количеством процессоров или узлов, в то время как количество задач, которые могут выполняться одновременно, зависит от количества процессоры, да? 3) Задача может содержать несколько операций?
Make42
1
4) Что вы имели в виду в своем последнем предложении? Ведь количество перегородок может варьироваться от этапа к этапу. Вы имели в виду, что именно так вы настроили свою работу на всех этапах?
Make42
@ Make42 Конечно, количество разделов может меняться от этапа к этапу - вы правы. Я хотел сказать, sum(..)чтобы учесть это изменение.
javadba
вау, ваш ответ был полностью нормальным, но, к сожалению, последнее предложение определенно неверно. Это не означает, что количество разделов на этапе равно количеству процессоров, однако вы можете установить количество разделов для RDD в соответствии с количеством ядер, представленных на вашем компьютере.
epcpu
@epcpu Это был особый случай, но я согласен, что это вводит в заблуждение, поэтому я удаляю его.
javadba
27

Это может помочь вам лучше понять разные части:

  • Этап: это набор задач. Один и тот же процесс работает с разными подмножествами данных (разделами).
  • Задача: представляет собой единицу работы над разделом распределенного набора данных. Итак, на каждом этапе количество задач = количество разделов, или, как вы сказали, «одна задача на этап на раздел».
  • Каждый исполнитель работает на одном контейнере пряжи, и каждый контейнер находится на одном узле.
  • На каждом этапе используется несколько исполнителей, каждому исполнителю выделяется несколько виртуальных ядер.
  • Каждый vcore может выполнять ровно одну задачу за раз
  • Таким образом, на любом этапе несколько задач могут выполняться параллельно. количество запущенных задач = количество используемых виртуальных ядер.
Педрам Башири
источник
2
Это действительно полезное чтение по искровой архитектуре: 0x0fff.com/spark-architecture
Pedram Bashiri
Я не понял вашего пункта номер 3. Насколько я знаю, каждый узел может иметь несколько исполнителей, поэтому согласно пункту 3: на каждом узле должен быть только один исполнитель. Вы можете прояснить этот момент?
Ритупарно Бехера,
@RituparnoBehera каждый узел может иметь несколько контейнеров и, следовательно, несколько исполнителей Spark. Ознакомьтесь с этой ссылкой. docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
педрам башири
15

Если я правильно понимаю, есть 2 (связанных) вещи, которые вас смущают:

1) От чего зависит содержание задачи?

2) От чего зависит количество выполняемых задач?

Движок Spark «склеивает» простые операции на последовательных рядах, например:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

поэтому, когда rdd3 вычисляется (лениво), Spark будет генерировать задачу для каждого раздела rdd1, и каждая задача будет выполнять как фильтр, так и карту для каждой строки, что приведет к rdd3.

Количество задач определяется количеством разделов. Каждый RDD имеет определенное количество разделов. Для исходного RDD, который читается из HDFS (например, с использованием sc.textFile (...)), количество разделов - это количество разделений, сгенерированных входным форматом. Некоторые операции с RDD могут привести к тому, что RDD будет иметь другое количество разделов:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Другой пример - соединения:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(Большинство) операций, которые изменяют количество разделов, включают перемешивание, например:

rdd2 = rdd1.repartition( 1000 ) 

на самом деле происходит то, что задача на каждом разделе rdd1 должна выдать конечный результат, который может быть прочитан на следующем этапе, чтобы rdd2 имел ровно 1000 разделов (как они это делают? Хеширование или сортировка ). Задачи на этой стороне иногда называют «задачами карты (побочными)». Задача, которая позже будет запущена на rdd2, будет действовать на одном разделе (из rdd2!) И должна будет выяснить, как читать / объединять выходные данные стороны карты, относящиеся к этому разделу. Задачи на этой стороне иногда называют «Уменьшить (побочные) задачи».

Эти 2 вопроса связаны между собой: количество задач в стадии - это количество разделов (общее для последовательных РСД, «склеенных» вместе), и количество разделов РСД может меняться между стадиями (путем указания количества разделов для некоторых например, перемешивание, вызывающее операцию).

После начала выполнения этапа его задачи могут занимать слоты задач. Количество параллельных слотов задач - numExecutors * ExecutorCores. Как правило, они могут быть заняты задачами с разных, независимых этапов.

Харел Гликсман
источник