Я хотел бы прочитать CSV в искре и преобразовать его как DataFrame и сохранить в HDFS с помощью df.registerTempTable("table_name")
Я пытался:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Ошибка, которую я получил:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Какая команда предназначена для загрузки CSV-файла как DataFrame в Apache Spark?
Ответы:
spark-csv является частью основных функций Spark и не требует отдельной библиотеки. Так что вы могли бы просто сделать, например,
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
В scala (это работает для любого формата с указанием разделителя "," для csv, "\ t" для tsv и т. Д.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
источник
Анализируйте CSV и загружайте как DataFrame / DataSet с помощью Spark 2.x
Сначала инициализируйте
SparkSession
объект по умолчанию, он будет доступен в оболочках какspark
val spark = org.apache.spark.sql.SparkSession.builder .master("local") # Change it as per your cluster .appName("Spark CSV Reader") .getOrCreate;
1. Делайте это программно.
val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv")
Обновление: добавление всех параметров отсюда на случай, если ссылка будет неработоспособна в будущем
2. Вы также можете использовать этот способ SQL
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Зависимости :
"org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Версия Spark <2.0
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path");
Зависимости:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST,
источник
spark-core_2.11
иspark-sql_2.11
от2.0.1
версии хорошо. Если возможно, добавьте сообщение об ошибке.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
- это оставить.format("csv")
и заменить.load(...
на.csv(...
.option
Метод относится к классу DataFrameReader, возвращаемыйread
метод, гдеload
иcsv
методы возвращают dataframe поэтому не может иметь варианты присваиваемых после того как они называются. Этот ответ довольно подробный, но вы должны ссылаться на документацию, чтобы люди могли видеть все другие доступные параметры CSV spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameДля него Hadoop - 2.6, а Spark - 1.6, и без пакета «databricks».
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema)
источник
Ниже описано, как читать CSV в Spark 2.0.
val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path)
источник
spark.read.csv(path)
иspark.read.format("csv").load(path)
?В Java 1.8 Этот фрагмент кода отлично работает для чтения файлов CSV.
POM.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency>
Ява
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show();
источник
Есть много проблем с синтаксическим анализом файла CSV, он продолжает увеличиваться, если размер файла больше, если в значениях столбца есть неанглийские / escape / разделительные / другие символы, которые могут вызвать ошибки синтаксического анализа.
Тогда волшебство заключается в используемых опциях. Те, которые сработали для меня и, надеюсь, должны охватывать большинство крайних случаев, приведены в коде ниже:
### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True)
Надеюсь, это поможет. Для получения дополнительной информации см .: Использование PySpark 2 для чтения CSV с исходным кодом HTML.
Примечание. Приведенный выше код взят из Spark 2 API, где API для чтения файлов CSV поставляется в комплекте со встроенными пакетами Spark, которые можно установить.
Примечание. PySpark - это оболочка Python для Spark, которая использует тот же API, что и Scala / Java.
источник
Пример Penny's Spark 2 - это способ сделать это в spark2. Есть еще одна хитрость: создайте этот заголовок, выполнив первоначальное сканирование данных, установив для параметра
inferSchema
значениеtrue
Итак, если предположить, что
spark
вы настроили сеанс Spark , это операция по загрузке в индексный файл CSV всех изображений Landsat, которые amazon размещены на S3./* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz")
Плохая новость: это вызывает сканирование файла; для чего-то большого, такого как этот заархивированный файл CSV размером 20 + МБ, который может занять 30 секунд при длительном соединении. Имейте это в виду: вам лучше вручную кодировать схему после того, как вы ее получите.
(фрагмент кода Apache Software License 2.0 лицензирован, чтобы избежать всякой двусмысленности; что-то, что я сделал в качестве демонстрационного / интеграционного теста интеграции S3)
источник
Если вы создаете jar со scala 2.11 и Apache 2.0 или выше.
Нет необходимости создавать объект
sqlContext
илиsparkContext
. Достаточно одногоSparkSession
объекта для удовлетворения всех потребностей.Ниже приведен мой код, который отлично работает:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } }
Если вы работаете в кластере, просто измените
.master("local")
на.master("yarn")
при определенииsparkBuilder
объекта.Об этом говорится в Spark Doc: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
источник
Добавьте следующие зависимости Spark в файл POM:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency>
// Конфигурация Spark:
val spark = SparkSession.builder (). master ("local"). appName ("Образец приложения"). getOrCreate ()
// Чтение csv файла:
val df = spark.read.option ("заголовок", "истина"). csv ("FILE_PATH")
// Отображение вывода
df.show ()
источник
В Spark 2.4+, если вы хотите загрузить csv из локального каталога, вы можете использовать 2 сеанса и загрузить их в куст. Первый сеанс должен быть создан с помощью master () config как «local [*]», а второй сеанс - с «yarn» и включенными Hive.
Ниже сработал для меня.
import org.apache.log4j.{Level, Logger} import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.sql._ object testCSV { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate() import spark_local.implicits._ spark_local.sql("SET").show(100,false) val local_path="/tmp/data/spend_diversity.csv" // Local file val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory df_local.show(false) val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate() import spark.implicits._ spark.sql("SET").show(100,false) val df = df_local df.createOrReplaceTempView("lcsv") spark.sql(" drop table if exists work.local_csv ") spark.sql(" create table work.local_csv as select * from lcsv ") }
Когда побежал с
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
ним все прошло нормально и создал таблицу в улье.источник
Для чтения из относительного пути в системе используйте метод System.getProperty для получения текущего каталога и дальнейшего использования для загрузки файла с использованием относительного пути.
scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv") scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path) scala> csvDf.take(3)
искра: 2.4.4 Scala: 2.11.12
источник
Формат файла по умолчанию - Parquet с spark.read .. и чтение файла csv, поэтому вы получаете исключение. Укажите формат csv с api, который вы пытаетесь использовать
источник
Попробуйте это, если используете Spark 2.0+
For non-hdfs file: df = spark.read.csv("file:///csvfile.csv") For hdfs file: df = spark.read.csv("hdfs:///csvfile.csv") For hdfs file (with different delimiter than comma: df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Примечание: - это работает для любого файла с разделителями. Просто используйте опцию («разделитель»,), чтобы изменить значение.
Надеюсь, это будет полезно.
источник
Благодаря встроенному в Spark csv это можно легко сделать с помощью нового объекта SparkSession для Spark> 2.0.
val df = spark. read. option("inferSchema", "false"). option("header","true"). option("mode","DROPMALFORMED"). option("delimiter", ";"). schema(dataSchema). csv("/csv/file/dir/file.csv") df.show() df.printSchema()
Вы можете установить различные параметры.
header
: включает ли ваш файл строку заголовка вверхуinferSchema
: хотите ли вы автоматически выводить схему или нет. По умолчаниюtrue
. Я всегда предпочитаю предоставлять схему, чтобы гарантировать правильные типы данных.mode
: режим синтаксического анализа, РАЗРЕШЕННЫЙ, ОТКАЗАННЫЙ или ОТКАЗНЫЙdelimiter
: для указания разделителя по умолчанию используется запятая (',')источник