Почему задания Spark завершаются сбоем из-за org.apache.spark.shuffle.MetadataFetchFailedException: отсутствует выходное расположение для перемешивания 0 в режиме предположений?

88

Я выполняю задание Spark в режиме предположений. У меня около 500 задач и около 500 сжатых файлов размером 1 ГБ gz. Я продолжаю выполнять каждую работу, для 1-2 задач, прикрепленную ошибку, где она повторяется впоследствии десятки раз (препятствуя завершению работы).

org.apache.spark.shuffle.MetadataFetchFailedException: отсутствует выходное расположение для перемешивания 0

Есть идеи, в чем смысл проблемы и как ее преодолеть?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
дотан
источник
1
Вы видели какие-нибудь информационные LostExecutorсообщения? Можете ли вы проверить страницу исполнителей веб-интерфейса и посмотреть, как ведут себя исполнители, особенно? GC-мудрый?
Яцек Ласковски

Ответы:

52

Это случилось со мной, когда я выделил рабочему узлу больше памяти, чем у него есть. Поскольку у него не было подкачки, Spark разбился при попытке сохранить объекты для перетасовки, не оставив больше памяти.

Решение заключалось в том, чтобы либо добавить подкачку, либо настроить рабочий / исполнитель на использование меньшего объема памяти в дополнение с использованием уровня хранения MEMORY_AND_DISK для нескольких сохранений.

Йорен Ван Северен
источник
3
Если у вас есть ресурс на узле (памяти), вы можете попробовать увеличить память искрового исполнителя. Я попробую это в первую очередь, если вас тоже беспокоит производительность.
nir
15
Привет, @Joren, это не соревнование. Проблема с OP заключается в том, что у исполнителя недостаточно памяти для хранения случайного вывода. Что сработало для вас, так это не уменьшение памяти исполнителя, а использование уровня хранения MEMORY_AND_DISK, который устраняет ограничение памяти исполнителя. Также ОП не говорит о том, сколько у него ресурсов для исполнителя.
nir
У меня та же проблема, и я пробовал такие методы, как увеличение памяти исполнителя, увеличение количества повторных разделов, освобождение большего количества физической памяти. Иногда это срабатывало, а иногда - нет. Я обнаружил, что это происходит только на этапе произвольного чтения, и я хотел бы спросить, где я могу установить StorageLevel?
Lhfcws
Я оптимизировал структуру данных и исправил ее. Я просто изменил HashMap на byte [], который был сериализован protostuff
Lhfcws
1
Попробуйте изменить spark.driver.overhead.memory и spark.executor.overhead.memory на значение больше 384 (по умолчанию), и все должно работать. Вы можете использовать 1024 МБ или 2048 МБ.
rahul gulati 06
15

У нас была аналогичная ошибка со Spark, но я не уверен, что она связана с вашей проблемой.

Мы использовали JavaPairRDD.repartitionAndSortWithinPartitions100 ГБ данных, и оно продолжало давать сбои, как и ваше приложение. Затем мы просмотрели журналы Yarn на конкретных узлах и обнаружили, что у нас какая-то проблема нехватки памяти, поэтому Yarn прервал выполнение. Наше решение было изменить / добавить spark.shuffle.memoryFraction 0в .../spark/conf/spark-defaults.conf. Это позволило нам таким образом обрабатывать гораздо больший (но, к сожалению, не бесконечный) объем данных.

Notinlist
источник
Это действительно "0" или это опечатка? Какова логика этого, чтобы заставить его навсегда перетечь на диск?
Вирджил
@Virgil Да. Мы сделали несколько тестов. Чем ближе мы были к нулю, тем больше становилась обрабатываемая сумма. Цена была 20% времени.
Notinlist
Интересно, что я также уменьшил spark.shuffle.memoryFraction до нуля, но получил больше ошибок подряд. (А именно: MetadataFetchFailedException и FetchFailedException с перерывами) Это должно стать ошибкой / проблемой, если «все разлив» имеет меньше ошибок, чем «частичное разливание».
tribbloid
11

У меня такая же проблема на моем кластере YARN из 3 машин. Я продолжал менять ОЗУ, но проблема не исчезла. Наконец я увидел в журналах следующие сообщения:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

и после этого было такое сообщение:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

Я изменил свойства в spark-defaults.conf следующим образом:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

Это оно! После этого моя работа успешно завершилась.

xplorerdev
источник
1
В искровых документы, он сказал: spark.executor.heartbeatInterval should be significantly less than spark.network.timeout. Таким образом, установка одного и того же значения для них может быть не лучшей идеей.
Bitswazsky
2

Я решил эту ошибку, увеличив выделенную память в ExecterMemory и driverMemory. Вы можете сделать это в HUE, выбрав программу Spark, которая вызывает проблему, а в свойствах -> Список параметров вы можете добавить что-то вроде этого:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

Конечно, значения параметров будут варьироваться в зависимости от размера кластера и ваших потребностей.

Игнасио Алорре
источник
2

Что касается меня, я работал с окнами для больших данных (около 50 млрд строк) и получал нагрузку на лодку

ExternalAppendOnlyUnsafeRowArray:54 - Достигнут порог разлива в 4096 рядов, переключился на org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

В моих журналах. Очевидно, что 4096 может быть маленьким при таком размере данных ... это привело меня к следующей JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

И, наконец, к следующим двум параметрам конфигурации:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

Оба по умолчанию - 4096; Я поднял их намного выше (2097152), и сейчас дела идут хорошо. Я не уверен на 100%, что это та же проблема, что и здесь, но попробовать это другое дело.

Майкл Кирико
источник
1

в веб-интерфейсе Spark, если есть информация, например Executors lost , вам нужно проверить журнал пряжи, убедиться, что ваш контейнер был убит.

Если контейнер был убит, вероятно, это связано с нехваткой памяти.

Как найти ключевую информацию в журналах пряжи? Например, могут быть такие предупреждения:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

В этом случае предлагается увеличить spark.yarn.executor.memoryOverhead.

DennisLi
источник
0

В моем случае (автономный кластер) возникло исключение, потому что файловая система некоторых ведомых устройств Spark была заполнена на 100%. Удаление всего в spark/workпапках рабов решило проблему.

i000174
источник
0

У меня та же проблема, но я искал много ответов, которые не могут решить мою проблему. в конце концов я отлаживаю свой код шаг за шагом. Я считаю, что проблема, вызванная размером данных, не сбалансирована для каждого раздела, что привело к тому, MetadataFetchFailedExceptionчто на mapэтапе, а не на reduceэтапе. просто сделай df_rdd.repartition(nums)раньшеreduceByKey()

Собака
источник