Тупик, когда одновременно запланировано много рабочих мест

17

Использование spark 2.4.4 в кластерном режиме YARN с планировщиком FIFO spark.

Я отправляю несколько операций с пламенем в dataframe (т.е. записываю данные в S3), используя исполнителя пула потоков с переменным числом потоков. Это работает нормально, если у меня ~ 10 потоков, но если я использую сотни потоков, то, по-видимому, возникает тупик, и в соответствии с пользовательским интерфейсом Spark не планируются никакие задания.

Какие факторы контролируют, сколько рабочих мест может быть запланировано одновременно? Ресурсы драйвера (например, память / ядра)? Некоторые другие настройки конфигурации свечи?

РЕДАКТИРОВАТЬ:

Вот краткий обзор моего кода

ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);

Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);

List<Future<Void>> futures = listOfSeveralHundredThings
  .stream()
  .map(aThing -> ecs.submit(() -> {
    df
      .filter(col("some_column").equalTo(aThing))
      .write()
      .format("org.apache.hudi")
      .options(writeOptions)
      .save(outputPathFor(aThing));
    return null;
  }))
  .collect(Collectors.toList());

IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();

В какой-то момент, по мере nThreadsувеличения, похоже, что spark больше не планирует какие-либо задания, о чем свидетельствуют:

  • ecs.poll(...) тайм-аут в конце концов
  • На вкладке заданий пользовательского интерфейса Spark нет активных заданий
  • На вкладке Исполнители пользовательского интерфейса Spark отсутствуют активные задачи для какого-либо исполнителя.
  • Вкладка SQL Spark UI, показывающая nThreadsзапущенные запросы без идентификаторов запущенных заданий

Моя среда исполнения

  • AWS EMR 5.28.1
  • Spark 2.4.4
  • Главный узел = m5.4xlarge
  • Основные узлы = 3x rd5.24xlarge
  • spark.driver.cores=24
  • spark.driver.memory=32g
  • spark.executor.memory=21g
  • spark.scheduler.mode=FIFO
Скотт
источник
пожалуйста, проверьте spark.apache.org/docs/latest/job-scheduling.html
dassum
Есть ли конкретный раздел, который обсуждает это? За последние несколько дней я несколько раз перечитывал эти документы и не нашел ответа, который искал.
Скотт
2
Не могли бы вы показать код, который вы используете для отправки заданий Spark через исполнителя пула потоков? Кажется, что тупик происходит до того, как задание Spark отправлено.
Салим
1
Вы можете разместить свой код? Пожалуйста, предоставьте подробную информацию о вашем env: CPU, RAM; Кроме того, как вы создаете темы: одновременно или в небольших группах по 10 человек?
Сахид
Извините, как вы имеете в виду, что работа не запланирована? Они не отображаются в пользовательском интерфейсе Spark или отображаются в списке заданий, но задачи не выполняются? В любом случае, если вы подозреваете взаимоблокировку, запустите, jstack -lчтобы получить дамп потока с информацией о блокировке.
Даниэль Дарабос

Ответы:

0

Если возможно, запишите выходные данные заданий в AWS Elastic MapReduce hdfs (чтобы использовать почти мгновенные переименования и улучшенный файловый IO локальных hdfs) и добавьте шаг dstcp для перемещения файлов на S3, чтобы избавить себя от всех проблем обработки внутренности хранилища объектов, пытающихся быть файловой системой. Кроме того, запись в локальные hdf-файлы позволит вам разрешить спекуляции управлять сбегающими задачами, не попадая в тупиковые ловушки, связанные с DirectOutputCommiter.

Если вы должны использовать S3 в качестве выходного каталога, убедитесь, что установлены следующие конфигурации Spark

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Примечание. DirectParquetOutputCommitter удален из Spark 2.0 из-за вероятности потери данных. К сожалению, пока мы не улучшили согласованность с S3a, мы должны работать с обходными путями. Вещи улучшаются с Hadoop 2.8

Избегайте имен ключей в лексикографическом порядке. Можно использовать хеширование / случайные префиксы или обратную дату-время для обхода. Хитрость заключается в том, чтобы называть ваши ключи иерархически, располагая наиболее распространенные вещи, по которым вы фильтруете, слева от вашего ключа. И никогда не подчеркивания в именах сегментов из-за проблем с DNS.

fs.s3a.fast.upload uploadПараллельное включение частей одного файла в Amazon S3

Обратитесь к этим статьям для более подробной информации.

Настройка spark.speculation в Spark 2.1.0 при записи в s3

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Девеш Мехта
источник
0

ИМО, вы, вероятно, подходите к этой проблеме неправильно. Если вы не можете гарантировать, что количество задач на задание будет очень низким, вы, скорее всего, не добьетесь значительного улучшения производительности, распараллеливая сотни задач одновременно. Ваш кластер может поддерживать только 300 задач одновременно, при условии, что вы используете параллелизм по умолчанию 200, что составляет всего 1,5 задания. Я бы посоветовал переписать ваш код, чтобы ограничить максимальное количество одновременных запросов на 10. Я очень подозреваю, что у вас есть 300 запросов, и только одна задача из нескольких сотен фактически выполняется. По этой причине большинство систем обработки данных OLTP намеренно имеют довольно низкий уровень одновременных запросов по сравнению с более традиционными системами RDS.

также

  1. Apache Hudi имеет параллелизм по умолчанию в несколько сотен FYI.
  2. Почему бы вам просто не разделить на основе вашего столбца фильтра?
Андрей Лонг
источник