Как загрузить локальный файл в sc.textFile вместо HDFS

100

Я следую отличному руководству по искрам

поэтому я пытаюсь загрузить в 46:00:00, README.mdно не могу то, что я делаю, это:

$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

как я могу это загрузить README.md?

Джас
источник

Ответы:

177

Попробуй явно указать sc.textFile("file:///path to the file/"). Ошибка возникает, когда настроена среда Hadoop.

SparkContext.textFile вызывает внутренние вызовы org.apache.hadoop.mapred.FileInputFormat.getSplits, которые, в свою очередь, используются, org.apache.hadoop.fs.getDefaultUriесли схема отсутствует. Этот метод считывает параметр "fs.defaultFS" из Hadoop conf. Если вы устанавливаете переменную среды HADOOP_CONF_DIR, параметр обычно устанавливается как «hdfs: // ...»; в противном случае «файл: //».

Suztomo
источник
Вы случайно не знаете, как это сделать с помощью Java? Я не вижу способа. Очень расстраивает то, что нет простого способа указать путь для загрузки файла из простой файловой системы.
Брэд Эллис
отвечаю сам. Есть переключатель --file, который вы передаете с помощью spark-submit. Таким образом, путь к файлу может быть жестко задан или, однако, ваша конфигурация настроена для приложения, но вы также указываете этот путь. когда вы отправляете, чтобы исполнители могли видеть путь.
Брэд Эллис
24

ответ гонбе отличный. Но все же хочу отметить, что file:///= ~/../../, а не $SPARK_HOME. Надеюсь, это поможет сэкономить время для таких новичков, как я.

Zaxliu
источник
4
file:///- это корневая папка файловой системы, которую видит исполняющая JVM, а не на два уровня выше домашней папки. Формат URI , как определено в RFC 8089 является file://hostname/absolute/path. В локальном случае hostnameкомпонент (полномочия) пуст.
Христо Илиев
18

Хотя Spark поддерживает загрузку файлов из локальной файловой системы, он требует, чтобы файлы были доступны по одному и тому же пути на всех узлах кластера.

Некоторые сетевые файловые системы, такие как NFS, AFS и слой NFS MapR, доступны пользователю как обычная файловая система.

Если ваши данные уже находятся в одной из этих систем, вы можете использовать их в качестве входных данных, просто указав file: // path; Spark будет обрабатывать это, пока файловая система смонтирована по одному и тому же пути на каждом узле. Каждый узел должен иметь одинаковый путь

 rdd = sc.textFile("file:///path/to/file")

Если ваш файл еще не на всех узлах в кластере, вы можете загрузить его локально в драйвер, не проходя через Spark, а затем вызвать parallelize для распространения содержимого среди рабочих.

Позаботьтесь о том, чтобы наперед указать file: // и использовать «/» или «\» в зависимости от ОС.

Акланк Джайн
источник
1
Есть ли способ, которым Spark автоматически копирует данные из своего каталога $ SPARK_HOME на все вычислительные узлы. Или вам нужно сделать это вручную?
Матиас
где исходный код искры обрабатывает различные форматы файловой системы?
Saher Ahwal 07
12

Вам просто нужно указать путь к файлу как «файл: /// каталог / файл»

пример:

val textFile = sc.textFile("file:///usr/local/spark/README.md")
Хамди Шареф
источник
12

Внимание:

Убедитесь, что вы запускаете искру в локальном режиме при загрузке данных из local ( sc.textFile("file:///path to the file/")), иначе вы получите такую ​​ошибку Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist. Исполнители Becasuse, которые работают на разных воркерах, не найдут этот файл по его локальному пути.

Матиджи66
источник
11

Если файл находится на вашем главном узле Spark (например, в случае использования AWS EMR), то сначала запустите искровую оболочку в локальном режиме.

$ spark-shell --master=local
scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Кроме того, вы можете сначала скопировать файл в HDFS из локальной файловой системы, а затем запустить Spark в режиме по умолчанию (например, YARN в случае использования AWS EMR), чтобы прочитать файл напрямую.

$ hdfs dfs -mkdir -p /hdfs/spark/examples
$ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
$ hadoop fs -ls /hdfs/spark/examples
Found 1 items
-rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json

$ spark-shell
scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
Джоардер Камаль
источник
9

У меня на рабочем столе есть файл NewsArticle.txt.

В Spark я набрал:

val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)

Мне нужно было изменить все символы \ на / для пути к файлу.

Чтобы проверить, сработало ли это, я набрал:

textFile.foreach(println)

Я использую Windows 7, и у меня не установлен Hadoop.

Ген
источник
5

Это обсуждалось в списке рассылки Spark, пожалуйста, обратитесь к этому письму .

Вы должны hadoop fs -put <localsrc> ... <dst>скопировать файл в hdfs:

${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md
Нан Сяо
источник
5

Это случилось со мной со Spark 2.3 с Hadoop, также установленным в общем домашнем каталоге пользователя «hadoop». Поскольку и Spark, и Hadoop были установлены в одном общем каталоге, Spark по умолчанию рассматривает схему как hdfsи начинает поиск входных файлов под hdfs, как указано fs.defaultFSв Hadoop core-site.xml. В таких случаях нам нужно явно указать схему как file:///<absoloute path to file>.

Бинита Бхарати
источник
0

Это решение этой ошибки, которую я получал в кластере Spark, который размещен в Azure в кластере Windows:

Загрузите необработанный файл HVAC.csv, проанализируйте его с помощью функции

data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")

Мы используем (wasb: ///), чтобы разрешить Hadoop доступ к файлу хранилища блога Azure, а три косой черты являются относительной ссылкой на папку контейнера запущенного узла.

Например: если путь к вашему файлу в проводнике на панели управления кластера Spark:

sflcc1 \ sflccspark1 \ HdiSamples \ SensorSampleData \ hvac

Путь описывается следующим образом: sflcc1: имя учетной записи хранения. sflccspark: имя узла кластера.

Таким образом, мы ссылаемся на имя текущего узла кластера с помощью трех относительных косых черт.

Надеюсь это поможет.

Мостафа
источник
0

Если вы пытаетесь прочитать файл из HDFS. пытаюсь установить путь в SparkConf

 val conf = new SparkConf().setMaster("local[*]").setAppName("HDFSFileReader")
 conf.set("fs.defaultFS", "hdfs://hostname:9000")
Вияан Джиингаде
источник
Добавьте в код отступ в 4 пробела / табуляции, чтобы он был отформатирован как код. С уважением
ЯковЛ
0

Вам не нужно использовать sc.textFile (...) для преобразования локальных файлов в фреймы данных. Один из вариантов - прочитать локальный файл построчно, а затем преобразовать его в набор данных Spark. Вот пример для Windows-машины на Java:

StructType schemata = DataTypes.createStructType(
            new StructField[]{
                    createStructField("COL1", StringType, false),
                    createStructField("COL2", StringType, false),
                    ...
            }
    );

String separator = ";";
String filePath = "C:\\work\\myProj\\myFile.csv";
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("MyApp").setMaster("local"));
JavaSparkContext jsc = new JavaSparkContext (sparkContext );
SQLContext sqlContext = SQLContext.getOrCreate(sparkContext );

List<String[]> result = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
    String line;
    while ((line = br.readLine()) != null) {
      String[] vals = line.split(separator);
      result.add(vals);
    }
 } catch (Exception ex) {
       System.out.println(ex.getMessage());
       throw new RuntimeException(ex);
  }
  JavaRDD<String[]> jRdd = jsc.parallelize(result);
  JavaRDD<Row> jRowRdd = jRdd .map(RowFactory::create);
  Dataset<Row> data = sqlContext.createDataFrame(jRowRdd, schemata);

Теперь вы можете использовать фрейм данных dataв своем коде.

Андрушенко Александр
источник
0

Я попробовал следующее, и это сработало из моей локальной файловой системы .. В основном искра может читать с локального пути, пути HDFS и AWS S3

listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")
BigData-Guru
источник
-6

пытаться

val f = sc.textFile("./README.md")
Сумья Симанта
источник
scala> val f = sc.textFile("./README.md") 14/12/04 12:54:33 INFO storage.MemoryStore: ensureFreeSpace(81443) called with curMem=164073, maxMem=278302556 14/12/04 12:54:33 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 79.5 KB, free 265.2 MB) f: org.apache.spark.rdd.RDD[String] = ./README.md MappedRDD[5] at textFile at <console>:12 scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md at
Jas
Можете ли вы сделать что-нибудь pwdв оболочке bashbash-4.1#
Soumya Simanta
bash-4.1 # pwd /usr/local/spark-1.1.0-bin-hadoop2.4
Jas
У меня это работает на искре без hadoop / hdfs. Однако, похоже, это не работает для OP, поскольку это дало им дамп ошибки.
Пол