Как перезаписать выходной каталог в Spark

108

У меня есть приложение для потоковой передачи искр, которое создает набор данных каждую минуту. Мне нужно сохранить / перезаписать результаты обработанных данных.

Когда я пытался перезаписать набор данных, org.apache.hadoop.mapred.FileAlreadyExistsException останавливает выполнение.

Я установил свойство Spark set("spark.files.overwrite","true"), но безуспешно.

Как перезаписать или удалить файлы из искры?

Виджай Иннамури
источник
1
Да, отстой, не так ли, я считаю, что это регресс к 0.9.0. Пожалуйста, примите мой ответ :)
samthebest
set("spark.files.overwrite","true")работает только для файлов, добавленных черезspark.addFile()
aiman

Ответы:

107

ОБНОВЛЕНИЕ: Предложите использовать Dataframes, плюс что-то вроде ... .write.mode(SaveMode.Overwrite) ....

Удобный сутенер:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Для более старых версий попробуйте

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

В версии 1.1.0 вы можете установить параметры конфигурации с помощью сценария spark-submit с флагом --conf.

ПРЕДУПРЕЖДЕНИЕ (более старые версии): согласно @piggybox, в Spark есть ошибка, при которой он будет перезаписывать только файлы, необходимые для записи part-файлов, любые другие файлы будут оставлены неизменными.

Самтебест
источник
30
Для Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Для Spark SQL у вас есть возможность определить SaveMode для Core Spark, у вас нет ничего подобного. Мне бы очень хотелось иметь такую ​​функцию для saveAsTextFile и других преобразований
Муртаза Канчвала
3
Скрытая проблема: по сравнению с решением @pzecevic по уничтожению всей папки через HDFS, в этом подходе Spark будет перезаписывать только файлы частей с тем же именем в выходной папке. Это работает в большинстве случаев, но если в папке есть что-то еще, например, дополнительные файлы деталей из другого задания Spark / Hadoop, эти файлы не будут перезаписаны.
piggybox
6
Вы также можете использовать df.write.mode(mode: String).parquet(path)режим Where: String может быть: «перезаписать», «добавить», «игнорировать», «ошибка».
рожь
1
@avocado Ага, думаю, API Spark становятся все хуже и хуже с каждым выпуском: P
samthebest
27

В документации к параметру spark.files.overwriteсказано: «Следует ли перезаписывать файлы, добавленные, SparkContext.addFile()когда целевой файл существует и его содержимое не совпадает с содержимым источника». Таким образом, это не влияет на метод saveAsTextFiles.

Вы можете сделать это перед сохранением файла:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Здесь объясняется Aas: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

Пжечевич
источник
29
а как насчет pyspark?
javadba
Следующий ответ на использование 'write.mode (SaveMode.Overwrite)' - лучший способ
YaOg
hdfs может удалять новые файлы по мере их поступления, так как все еще удаляет старые.
Jake
25

Из документации pyspark.sql.DataFrame.save (в настоящее время 1.3.1) вы можете указать mode='overwrite'при сохранении DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Я проверил, что это удалит даже оставшиеся файлы разделов. Итак, если вы изначально сказали 10 разделов / файлов, но затем перезаписали папку с помощью DataFrame, в котором было только 6 разделов, полученная папка будет иметь 6 разделов / файлов.

Дополнительные сведения о параметрах режима см. В документации Spark SQL .

dnlbrky
источник
2
Правильно и полезно, спасибо, но решение для DataFrame spark.hadoop.validateOutputSpecsбудет работать со всеми API Spark.
samthebest
У spark.hadoop.validateOutputSpecsменя почему- то не работало на 1.3, а вот это работает.
Эрик Уокер
1
@samthebest С помощью save(... , mode=маршрута вы можете перезаписать один набор файлов, добавить другой и т. д. в том же контексте Spark. Не spark.hadoop.validateOutputSpecsограничили бы вас только одним режимом для каждого контекста?
dnlbrky 03
1
@dnlbrky OP не просил добавить. Как я уже сказал, правда, полезно, но ненужно. Если OP спросил: «Как мне добавить?», Можно было бы дать целый ряд ответов. Но не будем вдаваться в подробности. Также я советую вам рассмотреть возможность использования версии DataFrames для Scala, поскольку в ней есть безопасность типов и дополнительная проверка - например, если у вас есть опечатка в «перезаписи», вы не узнаете об этом до тех пор, пока этот DAG не будет оценен, - что в работе с большими данными может будет через 2 часа !! Если вы используете версию Scala, компилятор все проверит заранее! Довольно круто и очень важно для больших данных.
samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")работает, если вы хотите перезаписать паркетный файл с помощью python. Это в искре 1.6.2. API может быть другим в более поздних версиях

акн
источник
Да, это отлично подходит для моих требований (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
Вакуар Хан
источник
Только для Spark 1, в последней версии используйтеdf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Эта перегруженная версия сохранения меня работает функции :

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Перезаписать"))

В приведенном выше примере будет перезаписана существующая папка. Режим сохранения также может принимать эти параметры ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Добавление : режим добавления означает, что при сохранении DataFrame в источник данных, если данные / таблица уже существуют, ожидается, что содержимое DataFrame будет добавлено к существующим данным.

ErrorIfExists : режим ErrorIfExists означает, что при сохранении DataFrame в источник данных, если данные уже существуют, ожидается выброс исключения.

Игнорировать : режим игнорирования означает, что при сохранении DataFrame в источник данных, если данные уже существуют, ожидается, что операция сохранения не сохранит содержимое DataFrame и не изменит существующие данные.

Шай
источник
1

Если вы хотите использовать свой собственный формат вывода, вы также сможете добиться желаемого поведения с помощью RDD.

Взгляните на следующие классы: FileOutputFormat , FileOutputCommitter

В формате вывода файла у вас есть метод с именем checkOutputSpecs, который проверяет, существует ли выходной каталог. В FileOutputCommitter у вас есть commitJob, который обычно передает данные из временного каталога в его окончательное место.

Я еще не смог это проверить (сделал бы это, как только у меня будет несколько свободных минут), но теоретически: если я расширю FileOutputFormat и переопределю checkOutputSpecs на метод, который не генерирует исключение в каталоге, который уже существует, и отрегулирую commitJob моего настраиваемого коммиттера вывода для выполнения любой логики, которую я хочу (например, переопределить некоторые файлы, добавить другие), чем я смогу достичь желаемого поведения с помощью RDD.

Формат вывода передается в: saveAsNewAPIHadoopFile (который также вызывается методом saveAsTextFile для фактического сохранения файлов). Коммиттер вывода настраивается на уровне приложения.

Михаил Копаньев
источник
Я бы не стал прибегать к подклассу FileOutputCommitter, если вы можете ему помочь: это страшный фрагмент кода. Hadoop 3.0 добавляет точку подключаемого модуля, в которой FileOutputFormat может принимать различные реализации реорганизованного суперкласса (PathOutputCommitter). S3 от Netflix будет записывать на месте в секционированное дерево, выполняя разрешение конфликтов (сбой, удаление, добавление) только при фиксации задания и только в обновленных разделах
стивел