Я использую https://github.com/databricks/spark-csv , я пытаюсь написать один CSV, но не могу, он создает папку.
Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV.
источник
Я использую https://github.com/databricks/spark-csv , я пытаюсь написать один CSV, но не могу, он создает папку.
Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV.
Он создает папку с несколькими файлами, потому что каждый раздел сохраняется индивидуально. Если вам нужен единственный выходной файл (все еще в папке), вы можете repartition
(предпочтительно, если исходящие данные большие, но требуется перемешивание):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
или coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
кадр данных перед сохранением:
Все данные будут записаны в mydata.csv/part-00000
. Прежде чем использовать эту опцию , убедитесь, что вы понимаете, что происходит и какова стоимость передачи всех данных одному исполнителю . Если вы используете распределенную файловую систему с репликацией, данные будут передаваться несколько раз - сначала извлекаются одному работнику, а затем распределяются по узлам хранения.
В качестве альтернативы вы можете оставить свой код как есть и использовать инструменты общего назначения, такие как cat
или HDFS,getmerge
чтобы потом просто объединить все части.
.coalesce(1)
что в каталоге _porary указано какое-то исключение FileNotFoundException. Это все еще ошибка в Sparkcoalesce(1)
- высокая стоимость и, как правило, непрактичность.Если вы используете Spark с HDFS, я решил проблему, написав файлы csv в обычном режиме и используя HDFS для слияния. Я делаю это непосредственно в Spark (1.6):
Не могу вспомнить, где я научился этому трюку, но он может сработать для вас.
источник
Возможно, я немного опоздал с игрой здесь, но использую
coalesce(1)
илиrepartition(1)
могу работать для небольших наборов данных, но большие наборы данных все будут помещены в один раздел на одном узле. Это может привести к ошибкам OOM или, в лучшем случае, к медленной обработке.Я настоятельно рекомендую вам использовать
FileUtil.copyMerge()
функцию из Hadoop API. Это объединит выходные данные в один файл.РЕДАКТИРОВАТЬ - это эффективно передает данные драйверу, а не узлу-исполнителю.
Coalesce()
было бы хорошо, если бы у одного исполнителя было больше оперативной памяти, чем у драйвера.РЕДАКТИРОВАТЬ 2 :
copyMerge()
удаляется в Hadoop 3.0. См. Следующую статью о переполнении стека для получения дополнительной информации о том, как работать с последней версией: Как выполнять CopyMerge в Hadoop 3.0?источник
Если вы используете Databricks и можете уместить все данные в ОЗУ на одном работнике (и, следовательно, можете использовать
.coalesce(1)
), вы можете использовать dbfs для поиска и перемещения полученного файла CSV:Если ваш файл не помещается в оперативную память рабочего, вы можете рассмотреть предложение chaotic3quilibrium использовать FileUtils.copyMerge () . Я этого не делал и пока не знаю, возможно ли это, например, на S3.
Этот ответ основан на предыдущих ответах на этот вопрос, а также на моих собственных тестах предоставленного фрагмента кода. Изначально я разместил его в Databricks и переиздаю здесь.
Лучшая документация для рекурсивной опции dbfs rm, которую я нашел, находится на форуме Databricks .
источник
Решение, которое работает для S3, модифицированного из Minkymorgan.
Просто передайте путь к временному секционированному каталогу (с другим именем, чем конечный путь) как
srcPath
и единственный окончательный csv / txt какdestPath
Укажите также,deleteSource
если вы хотите удалить исходный каталог.источник
искры из
df.write()
API создаст несколько файлов часть внутри данного пути ... чтобы сила искры записи только одну часть использовать файлdf.coalesce(1).write.csv(...)
вместо ,df.repartition(1).write.csv(...)
как сливаются узкая трансформация , тогда как Передел широкий преобразование см Spark - Передел () против сливаются ()создаст папку в указанном пути к
part-0001-...-c000.csv
файлу с использованием одного файлаиметь удобное для пользователя имя файла
источник
df.toPandas().to_csv(path)
это, чтобы записать один CSV с вашим предпочтительным именем файлапереразбить / объединить в 1 раздел перед сохранением (вы все равно получите папку, но в ней будет один файл части)
источник
ты можешь использовать
rdd.coalesce(1, true).saveAsTextFile(path)
он будет хранить данные как одиночный файл по пути / part-00000
источник
Я решил использовать следующий подход (имя файла переименования hdfs): -
Шаг 1: - (Создать фрейм данных и записать в HDFS)
Шаг 2: - (Создать конфигурацию Hadoop)
Шаг 3: - (Получить путь в пути к папке hdfs)
Шаг 4: - (Получить имена файлов искр из папки hdfs)
setp5: - (создать изменяемый список scala, чтобы сохранить все имена файлов и добавить его в список)
Шаг 6: - (отфильтруйте порядок файлов _SUCESS из списка scala имен файлов)
шаг 7: - (преобразовать список scala в строку и добавить желаемое имя файла в строку папки hdfs, а затем применить переименование)
источник
Я использую это в Python, чтобы получить один файл:
источник
Этот ответ расширяет принятый ответ, дает больше контекста и предоставляет фрагменты кода, которые вы можете запустить в Spark Shell на своем компьютере.
Подробнее о принятом ответе
Принятый ответ может создать впечатление, что образец кода выводит один
mydata.csv
файл, а это не так. Продемонстрируем:Вот что получилось:
NB
mydata.csv
- это папка в принятом ответе - это не файл!Как вывести один файл с определенным именем
Мы можем использовать Spark-daria для записи одного
mydata.csv
файла.Это выведет файл следующим образом:
S3 пути
Вам нужно будет передать пути s3a,
DariaWriters.writeSingleFile
чтобы использовать этот метод в S3:См. Здесь для получения дополнительной информации.
Как избежать copyMerge
copyMerge был удален из Hadoop 3.
DariaWriters.writeSingleFile
Реализация используетfs.rename
, как описано здесь . Spark 3 по-прежнему использует Hadoop 2 , поэтому реализации copyMerge будут работать в 2020 году. Я не уверен, когда Spark обновится до Hadoop 3, но лучше избегать любого подхода copyMerge, который приведет к поломке вашего кода при обновлении Spark Hadoop.Исходный код
Поищите
DariaWriters
объект в исходном коде spark-daria, если хотите проверить реализацию.Реализация PySpark
С PySpark проще записать один файл, потому что вы можете преобразовать DataFrame в Pandas DataFrame, который по умолчанию записывается как один файл.
Ограничения
DariaWriters.writeSingleFile
Подход Scala иdf.toPandas()
Python подходить только работа для небольших наборов данных. Огромные наборы данных не могут быть записаны как отдельные файлы. Запись данных в один файл не оптимальна с точки зрения производительности, поскольку данные нельзя записывать параллельно.источник
используя Listbuffer, мы можем сохранять данные в один файл:
источник
Есть еще один способ использовать Java
источник