У меня есть приложение для потоковой передачи искр, которое создает набор данных каждую минуту. Мне нужно сохранить / перезаписать результаты обработанных данных.
Когда я пытался перезаписать набор данных, org.apache.hadoop.mapred.FileAlreadyExistsException останавливает выполнение.
Я установил свойство Spark set("spark.files.overwrite","true")
, но безуспешно.
Как перезаписать или удалить файлы из искры?
apache-spark
Виджай Иннамури
источник
источник
set("spark.files.overwrite","true")
работает только для файлов, добавленных черезspark.addFile()
Ответы:
ОБНОВЛЕНИЕ: Предложите использовать
Dataframes
, плюс что-то вроде... .write.mode(SaveMode.Overwrite) ...
.Удобный сутенер:
Для более старых версий попробуйте
В версии 1.1.0 вы можете установить параметры конфигурации с помощью сценария spark-submit с флагом --conf.
ПРЕДУПРЕЖДЕНИЕ (более старые версии): согласно @piggybox, в Spark есть ошибка, при которой он будет перезаписывать только файлы, необходимые для записи
part-
файлов, любые другие файлы будут оставлены неизменными.источник
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
режим Where: String может быть: «перезаписать», «добавить», «игнорировать», «ошибка».поскольку
df.save(path, source, mode)
устарел ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )используйте,
df.write.format(source).mode("overwrite").save(path)
где df.write - DataFrameWriter
'источник' может быть ("com.databricks.spark.avro" | "паркет" | "json")
источник
source
также может бытьcsv
В документации к параметру
spark.files.overwrite
сказано: «Следует ли перезаписывать файлы, добавленные,SparkContext.addFile()
когда целевой файл существует и его содержимое не совпадает с содержимым источника». Таким образом, это не влияет на метод saveAsTextFiles.Вы можете сделать это перед сохранением файла:
Здесь объясняется Aas: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
источник
Из документации pyspark.sql.DataFrame.save (в настоящее время 1.3.1) вы можете указать
mode='overwrite'
при сохранении DataFrame:Я проверил, что это удалит даже оставшиеся файлы разделов. Итак, если вы изначально сказали 10 разделов / файлов, но затем перезаписали папку с помощью DataFrame, в котором было только 6 разделов, полученная папка будет иметь 6 разделов / файлов.
Дополнительные сведения о параметрах режима см. В документации Spark SQL .
источник
spark.hadoop.validateOutputSpecs
будет работать со всеми API Spark.spark.hadoop.validateOutputSpecs
меня почему- то не работало на 1.3, а вот это работает.save(... , mode=
маршрута вы можете перезаписать один набор файлов, добавить другой и т. д. в том же контексте Spark. Неspark.hadoop.validateOutputSpecs
ограничили бы вас только одним режимом для каждого контекста?df.write.mode('overwrite').parquet("/output/folder/path")
работает, если вы хотите перезаписать паркетный файл с помощью python. Это в искре 1.6.2. API может быть другим в более поздних версияхисточник
источник
df.write.mode(SaveMode.Overwrite)
Эта перегруженная версия сохранения меня работает функции :
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Перезаписать"))
В приведенном выше примере будет перезаписана существующая папка. Режим сохранения также может принимать эти параметры ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Добавление : режим добавления означает, что при сохранении DataFrame в источник данных, если данные / таблица уже существуют, ожидается, что содержимое DataFrame будет добавлено к существующим данным.
ErrorIfExists : режим ErrorIfExists означает, что при сохранении DataFrame в источник данных, если данные уже существуют, ожидается выброс исключения.
Игнорировать : режим игнорирования означает, что при сохранении DataFrame в источник данных, если данные уже существуют, ожидается, что операция сохранения не сохранит содержимое DataFrame и не изменит существующие данные.
источник
Если вы хотите использовать свой собственный формат вывода, вы также сможете добиться желаемого поведения с помощью RDD.
Взгляните на следующие классы: FileOutputFormat , FileOutputCommitter
В формате вывода файла у вас есть метод с именем checkOutputSpecs, который проверяет, существует ли выходной каталог. В FileOutputCommitter у вас есть commitJob, который обычно передает данные из временного каталога в его окончательное место.
Я еще не смог это проверить (сделал бы это, как только у меня будет несколько свободных минут), но теоретически: если я расширю FileOutputFormat и переопределю checkOutputSpecs на метод, который не генерирует исключение в каталоге, который уже существует, и отрегулирую commitJob моего настраиваемого коммиттера вывода для выполнения любой логики, которую я хочу (например, переопределить некоторые файлы, добавить другие), чем я смогу достичь желаемого поведения с помощью RDD.
Формат вывода передается в: saveAsNewAPIHadoopFile (который также вызывается методом saveAsTextFile для фактического сохранения файлов). Коммиттер вывода настраивается на уровне приложения.
источник