Как распечатать содержимое RDD?

124

Я пытаюсь распечатать содержимое коллекции на консоли Spark.

У меня тип:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]

И я использую команду:

scala> linesWithSessionId.map(line => println(line))

Но это напечатано:

res1: org.apache.spark.rdd.RDD [Unit] = MappedRDD [4] на карте по адресу: 19

Как мне записать RDD на консоль или сохранить его на диск, чтобы я мог просматривать его содержимое?

голубое небо
источник
1
Здравствуй! читали ли вы комментарии к принятому вами ответу? Кажется, это вводит в заблуждение
dk14
2
@ dk14 согласился, я переназначил принятый ответ
blue-sky
RDD отнесены к категории граждан второго сорта, вам следует использовать DataFrame и showметод.
Thomas Decaux

Ответы:

235

Если вы хотите просмотреть содержимое RDD, можно использовать collect():

myRDD.collect().foreach(println)

Однако это плохая идея, когда в RDD миллиарды строк. Используйте, take()чтобы распечатать всего несколько:

myRDD.take(n).foreach(println)
Oussama
источник
1
Если я использую foreach в RDD (который имеет миллионы строк) для записи содержимого в HDFS в виде одного файла, будет ли он работать без проблем в кластере?
Shankar
Причина, по которой я не использую saveAsTextFileRDD, заключается в том, что мне нужно записать содержимое RDD в более чем один файл, поэтому я используюforeach
Шанкар
Если вы хотите сохранить в одном файле, вы можете объединить RDD в один раздел перед вызовом saveAsTextFile, но это опять же может вызвать проблемы. Я думаю, что лучший вариант - записать несколько файлов в HDFS, а затем использовать hdfs dfs --getmerge для объединения файлов
Усама
вы сказали, что при использовании foreach на RDD он будет сохраняться в оперативной памяти драйвера, правильно ли это утверждение? потому что я понял, что foreach будет работать на каждом рабочем [кластере], а не на драйвере.
Shankar
saveAsTextFile будет записывать по одному файлу на раздел, что вам и нужно (несколько файлов). В противном случае, как предлагает Усама, вы можете выполнить rdd.coalesce (1) .saveAsTextFile (), чтобы получить один файл. Если в RDD слишком мало разделов на ваш вкус, вы можете попробовать rdd.repartition (N) .saveAsTextFile ()
foghorn
49

mapФункция является преобразованием , а это значит , что искра не будет на самом деле оценить вашу RDD , пока вы не запустите действие на него.

Чтобы распечатать его, вы можете использовать foreach(что является действием):

linesWithSessionId.foreach(println)

Чтобы записать его на диск, вы можете использовать одну из saveAs...функций (статических действий) из RDD API.

fedragon
источник
6
Возможно, вам нужно упомянуть, collectчтобы RDD можно было распечатать в консоли.
zsxwing
1
foreachсам сначала «материализует» RDD, а затем запускается printlnдля каждого элемента, так что в collectэтом нет необходимости (хотя вы, конечно, можете использовать его) ...
fedragon
5
На самом деле без collect () до foreach я ничего не вижу на консоли.
Витторио Коццолино
3
На самом деле он отлично работает в моей оболочке Spark, даже в 1.2.0. Но я думаю, что знаю, откуда эта путаница: в исходном вопросе задавался вопрос, как распечатать RDD на консоли Spark (= оболочка), поэтому я предположил, что он будет запускать локальное задание, и в этом случае все foreachработает нормально. Если вы выполняете задание в кластере и хотите распечатать свой rdd, вам следует collect(как указано в других комментариях и ответах), чтобы оно было отправлено драйверу перед printlnвыполнением. И использование того, takeчто предлагает Усама, может быть хорошей идеей, если ваш RDD слишком велик.
fedragon
6
Приведенный выше ответ плохой. Вы должны отказаться от этого. Foreach не будет печатать на консоли, он будет печатать на ваших рабочих узлах. Если у вас только один узел, то будет работать foreach. Но если у вас всего один узел, то зачем вы используете искру? Просто используйте SQL awk, Grep или что-то более простое. Так что я думаю, что единственный верный ответ - собирать. Если сбор слишком большой для вас, и вы хотите использовать только образец, используйте функции take, head или simillar, как описано ниже.
eshalev
12

Если вы запускаете это в кластере println, печать обратно в ваш контекст не выполняется . Вам необходимо перенести RDDданные в вашу сессию. Для этого вы можете принудительно поместить его в локальный массив, а затем распечатать:

linesWithSessionId.toArray().foreach(line => println(line))
Ной
источник
12

Вы можете преобразовать ваш RDDв a DataFramethen show()it.

// For implicit conversion from RDD to DataFrame
import spark.implicits._

fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])

// convert to DF then show it
fruits.toDF().show()

Это покажет верхние 20 строк ваших данных, поэтому размер ваших данных не должен быть проблемой.

+------+---+                                                                    
|    _1| _2|
+------+---+
| apple|  1|
|banana|  2|
|orange| 17|
+------+---+
Wesam
источник
1
Я думаю, что этоimport spark.implicits._
Райан Хартман
Какая библиотека здесь использовалась? Я никогда не могу обнаружить ни , toDFни spark.implicits._в искровой области.
Сергей
1

Вероятно, существует множество архитектурных различий между myRDD.foreach(println)и myRDD.collect().foreach(println)(не только «сбор», но и другие действия). Одно из различий, которое я заметил, заключается в том, что при выполнении myRDD.foreach(println)вывод будет в случайном порядке. Например: если мой rdd поступает из текстового файла, в котором каждая строка имеет номер, вывод будет иметь другой порядок. Но когда я это сделал myRDD.collect().foreach(println), порядок остается таким же, как и в текстовом файле.

Каран Гупта
источник
1

В питоне

   linesWithSessionIdCollect = linesWithSessionId.collect()
   linesWithSessionIdCollect

Будет распечатано все содержимое СДР.

Ниранджан Молькери
источник
1
Спасибо, но я пометил этот вопрос с помощью scala, а не python
blue-sky
1
c.take(10)

и более новая версия Spark будет хорошо отображать таблицу.

Харви
источник
1

Вместо того, чтобы печатать каждый раз, вы можете;

[1] Создайте общий метод печати внутри Spark Shell.

def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)

[2] Или, что еще лучше, используя неявные выражения, вы можете добавить функцию в класс RDD, чтобы распечатать его содержимое.

implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
    def print = rdd.foreach(println)
}

Пример использования:

val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)

p(rdd) // 1
rdd.print // 2

Вывод:

2
6
4
8

Важный

Это имеет смысл только в том случае, если вы работаете в локальном режиме и с небольшим набором данных. В противном случае вы либо не сможете увидеть результаты на клиенте, либо закончится память из-за большого набора данных.

noego
источник
0

Вы также можете сохранить как файл: rdd.saveAsTextFile("alicia.txt")

Томас Деко
источник
0

В синтаксисе java:

rdd.collect().forEach(line -> System.out.println(line));
ForeverLearner
источник