Использование spark 2.4.4 в кластерном режиме YARN с планировщиком FIFO spark.
Я отправляю несколько операций с пламенем в dataframe (т.е. записываю данные в S3), используя исполнителя пула потоков с переменным числом потоков. Это работает нормально, если у меня ~ 10 потоков, но если я использую сотни потоков, то, по-видимому, возникает тупик, и в соответствии с пользовательским интерфейсом Spark не планируются никакие задания.
Какие факторы контролируют, сколько рабочих мест может быть запланировано одновременно? Ресурсы драйвера (например, память / ядра)? Некоторые другие настройки конфигурации свечи?
РЕДАКТИРОВАТЬ:
Вот краткий обзор моего кода
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
В какой-то момент, по мере nThreads
увеличения, похоже, что spark больше не планирует какие-либо задания, о чем свидетельствуют:
ecs.poll(...)
тайм-аут в конце концов- На вкладке заданий пользовательского интерфейса Spark нет активных заданий
- На вкладке Исполнители пользовательского интерфейса Spark отсутствуют активные задачи для какого-либо исполнителя.
- Вкладка SQL Spark UI, показывающая
nThreads
запущенные запросы без идентификаторов запущенных заданий
Моя среда исполнения
- AWS EMR 5.28.1
- Spark 2.4.4
- Главный узел =
m5.4xlarge
- Основные узлы = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
источник
jstack -l
чтобы получить дамп потока с информацией о блокировке.Ответы:
Если возможно, запишите выходные данные заданий в AWS Elastic MapReduce hdfs (чтобы использовать почти мгновенные переименования и улучшенный файловый IO локальных hdfs) и добавьте шаг dstcp для перемещения файлов на S3, чтобы избавить себя от всех проблем обработки внутренности хранилища объектов, пытающихся быть файловой системой. Кроме того, запись в локальные hdf-файлы позволит вам разрешить спекуляции управлять сбегающими задачами, не попадая в тупиковые ловушки, связанные с DirectOutputCommiter.
Если вы должны использовать S3 в качестве выходного каталога, убедитесь, что установлены следующие конфигурации Spark
Примечание. DirectParquetOutputCommitter удален из Spark 2.0 из-за вероятности потери данных. К сожалению, пока мы не улучшили согласованность с S3a, мы должны работать с обходными путями. Вещи улучшаются с Hadoop 2.8
Избегайте имен ключей в лексикографическом порядке. Можно использовать хеширование / случайные префиксы или обратную дату-время для обхода. Хитрость заключается в том, чтобы называть ваши ключи иерархически, располагая наиболее распространенные вещи, по которым вы фильтруете, слева от вашего ключа. И никогда не подчеркивания в именах сегментов из-за проблем с DNS.
fs.s3a.fast.upload upload
Параллельное включение частей одного файла в Amazon S3Обратитесь к этим статьям для более подробной информации.
Настройка spark.speculation в Spark 2.1.0 при записи в s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
источник
ИМО, вы, вероятно, подходите к этой проблеме неправильно. Если вы не можете гарантировать, что количество задач на задание будет очень низким, вы, скорее всего, не добьетесь значительного улучшения производительности, распараллеливая сотни задач одновременно. Ваш кластер может поддерживать только 300 задач одновременно, при условии, что вы используете параллелизм по умолчанию 200, что составляет всего 1,5 задания. Я бы посоветовал переписать ваш код, чтобы ограничить максимальное количество одновременных запросов на 10. Я очень подозреваю, что у вас есть 300 запросов, и только одна задача из нескольких сотен фактически выполняется. По этой причине большинство систем обработки данных OLTP намеренно имеют довольно низкий уровень одновременных запросов по сравнению с более традиционными системами RDS.
также
источник