Я новичок в Spark, и я пытаюсь прочитать данные CSV из файла с помощью Spark. Вот что я делаю:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Я ожидал, что этот вызов даст мне список двух первых столбцов моего файла, но я получаю эту ошибку:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
хотя в моем CSV-файле больше одного столбца.
python
csv
apache-spark
pyspark
Ядро
источник
источник
csv
библиотеку для обработки всех экранирований, потому что простое разделение запятой не сработает, если, скажем, в значениях есть запятые.","
.Spark 2.0.0+
Вы можете напрямую использовать встроенный источник данных csv:
или
без включения каких-либо внешних зависимостей.
Spark <2.0.0 :
Вместо ручного синтаксического анализа, который в общем случае далеко не тривиален, я бы рекомендовал
spark-csv
:Убедитесь в том , что Спарк CSV включен в пути (
--packages
,--jars
,--driver-class-path
)И загрузите свои данные следующим образом:
Он может обрабатывать загрузку, вывод схемы, отбрасывание искаженных строк и не требует передачи данных из Python в JVM.
Примечание :
Если вы знаете схему, лучше избегать вывода схемы и передавать ее
DataFrameReader
. Предположим, у вас есть три столбца - целые, двойные и строковые:источник
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(не забудьте изменить версии databricks / spark на те, которые вы установили).источник
И еще один вариант, который заключается в чтении файла CSV с помощью Pandas, а затем импорте Pandas DataFrame в Spark.
Например:
источник
Простое разделение запятыми также приведет к разделению запятых внутри полей (например
a,b,"1,2,3",c
), поэтому это не рекомендуется. Ответ zero323 хорош, если вы хотите использовать API DataFrames, но если вы хотите придерживаться базового Spark, вы можете проанализировать csvs в базовом Python с помощью модуля csv :РЕДАКТИРОВАТЬ: как @muon упомянул в комментариях, это будет обрабатывать заголовок как любую другую строку, поэтому вам нужно будет извлечь его вручную. Например,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(убедитесь, что не изменилиheader
до оценки фильтра). Но на этом этапе вам, вероятно, лучше использовать встроенный парсер csv.источник
StringIO
.csv
может использовать любую итерацию b)__next__
не должен использоваться напрямую и завершится ошибкой в пустой строке. Взгляните на flatMap c) Было бы гораздо эффективнее использоватьmapPartitions
вместо инициализации считывателя на каждой строке :)rdd.mapPartitions(lambda x: csv.reader(x))
срабатывает, покаrdd.map(lambda x: csv.reader(x))
выдает ошибку? Я ожидал, что оба бросят одинаковоTypeError: can't pickle _csv.reader objects
. Также кажется, чтоmapPartitions
автоматически вызывает некоторый эквивалент "readlines" дляcsv.reader
объекта, где withmap
, мне нужно было__next__
явно вызвать, чтобы получить списки изcsv.reader
. 2) Откуда тутflatMap
взяться? Просто позвонить вmapPartitions
одиночку сработало для меня.rdd.mapPartitions(lambda x: csv.reader(x))
работает, потому чтоmapPartitions
ожидаетIterable
объект. Если вы хотите быть явным, вы могли бы понять понимание или выражение генератора.map
сам по себе не работает, потому что он не выполняет итерацию по объекту. Отсюда и мое предложение использовать,flatMap(lambda x: csv.reader([x]))
который будет перебирать читателя. НоmapPartitions
здесь намного лучше.Это в PYSPARK
Тогда вы можете проверить
источник
Если вы хотите загрузить csv как фрейм данных, вы можете сделать следующее:
У меня все сработало.
источник
Это соответствует тому, что изначально предлагал JP Mercier об использовании Pandas, но с серьезной модификацией: если вы читаете данные в Pandas кусками, они должны быть более гибкими. Это означает, что вы можете анализировать файл гораздо большего размера, чем Pandas может обрабатывать как единый фрагмент, и передавать его Spark в меньших размерах. (Это также отвечает на комментарий о том, почему нужно использовать Spark, если они все равно могут загружать все в Pandas.)
источник
Теперь есть еще один вариант для любого общего файла csv: https://github.com/seahboonsiew/pyspark-csv следующим образом:
Предположим, у нас есть следующий контекст
Сначала раздайте pyspark-csv.py исполнителям с помощью SparkContext
Чтение данных csv через SparkContext и преобразование их в DataFrame
источник
Если ваши данные csv не содержат символов новой строки ни в одном из полей, вы можете загрузить свои данные
textFile()
и проанализировать их.источник
Если у вас есть одна или несколько строк с меньшим или большим количеством столбцов, чем 2 в наборе данных, может возникнуть эта ошибка.
Я также новичок в Pyspark и пытаюсь прочитать файл CSV. Для меня работал следующий код:
В этом коде я использую набор данных из kaggle, ссылка: https://www.kaggle.com/carrie1/ecommerce-data
1. Без упоминания схемы:
Теперь проверьте столбцы: sdfData.columns
Результат будет:
Проверьте тип данных для каждого столбца:
Это даст фрейм данных со всеми столбцами с типом данных как StringType
2. Со схемой: если вы знаете схему или хотите изменить тип данных любого столбца в приведенной выше таблице, используйте это (скажем, у меня есть следующие столбцы и я хочу, чтобы они были с определенным типом данных для каждого из них)
Теперь проверьте схему на тип данных каждого столбца:
Отредактировано: мы также можем использовать следующую строку кода без явного упоминания схемы:
Результат:
Результат будет выглядеть так:
источник
При использовании
spark.read.csv
я считаю, что с помощью опцийescape='"'
иmultiLine=True
предоставляю наиболее согласованное решение для стандарта CSV , и в моем опыте работы лучше всего с CSV файлов , экспортированных из Google Таблиц.То есть,
источник
import pyspark as spark
?spark
уже инициализирован. В сценарии, представленном пользователемspark-submit
, вы можете создать его экземпляр какfrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.