Я хочу прочитать несколько текстовых файлов из местоположения hdfs и выполнить сопоставление с ним в итерации, используя spark.
JavaRDD<String> records = ctx.textFile(args[1], 1);
способен читать только один файл за раз.
Я хочу прочитать более одного файла и обработать их как один RDD. Как?
apache-spark
user3705662
источник
источник
Path
применяются все те же параметры.sc.wholeTextFiles
удобен для данных, которые не разделены строкойsc.textFile(multipleCommaSeparatedDirs,320)
он приводит к19430
общему количеству задач вместо320
... он ведет себя подобно тому,union
что также приводит к безумному количеству задач с очень низким параллелизмомwholeTextFiles
. Какой у вас вариант использования? Я могу придумать обходной путь, если вы используете то же количество разделов, что и файлы ...Используйте
union
следующим образом:Тогда
bigRdd
это СДР со всеми файлами.источник
Вы можете использовать один вызов textFile для чтения нескольких файлов. Scala:
источник
sc.textFile(files.mkString(","))
Вы можете использовать это
Сначала вы можете получить буфер / список путей S3:
Теперь передайте этот объект List следующему коду, обратите внимание: sc является объектом SQLContext
Теперь вы получили окончательный Унифицированный СДР, т.е. DF
Необязательно, и вы также можете перераспределить его в одном BigRDD
Перераспределение всегда работает: D
источник
В PySpark я нашел еще один полезный способ разбора файлов. Возможно, в Scala есть эквивалент, но мне не совсем удобно придумывать рабочий перевод. По сути, это вызов textFile с добавлением меток (в приведенном ниже примере ключ = имя файла, значение = 1 строка из файла).
«Помеченный» textFile
вход:
output: массив с каждой записью, содержащей кортеж, используя filename-as-key и со значением = каждая строка файла. (Технически, используя этот метод, вы также можете использовать другой ключ помимо фактического имени пути к файлу - возможно, хеш-представление для экономии памяти). то есть.
Вы также можете рекомбинировать в виде списка строк:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Или рекомбинируйте целые файлы обратно в одну строку (в этом примере результат совпадает с тем, что вы получаете от wholeTextFiles, но со строкой «file:», удаленной из пути к файлу.):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
источник
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
я получил ИЭ ошибкиTypeError: 'PipelinedRDD' object is not iterable
. Насколько я понимаю, эта строка создает RDD, который является неизменным, поэтому мне было интересно, как вы смогли добавить его к другой переменной?ты можешь использовать
здесь вы получите путь к вашему файлу и его содержимое. так что вы можете выполнять любое действие целого файла за раз, что экономит накладные расходы
источник
Все ответы верны с
sc.textFile
Мне просто интересно, почему нет
wholeTextFiles
Например, в этом случае ...Одним из ограничений является то, что мы должны загружать небольшие файлы, иначе производительность будет плохой и может привести к OOM.
Примечание :
Дальнейшая ссылка для посещения
источник
sc.wholeTextFiles(folder).flatMap...
Есть прямое чистое доступное решение. Используйте метод wholeTextFiles (). Это займет каталог и сформирует пару ключ-значение. Возвращенная СДР будет парой СДР. Найдите ниже описание из документов Spark :
источник
ПОПРОБУЙТЕ ЭТОТ интерфейс, используемый для записи DataFrame во внешние системы хранения (например, файловые системы, хранилища ключей и т. Д.). Используйте DataFrame.write () для доступа к этому.
Новое в версии 1.4.
csv (путь, режим = Нет, сжатие = Нет, септ = Нет, кавычка = Нет, escape = Нет, заголовок = Нет, nullValue = Нет, escapeQuotes = Нет, quoteAll = Нет, dateFormat = Нет, timestampFormat = Нет) Сохраняет содержимое DataFrame в формате CSV по указанному пути.
Параметры: путь - путь в любом режиме файловой системы, поддерживаемом Hadoop, - определяет поведение операции сохранения, когда данные уже существуют.
append: добавить содержимое этого DataFrame к существующим данным. перезаписать: перезаписать существующие данные. игнорировать: игнорировать эту операцию, если данные уже существуют. ошибка (случай по умолчанию): выдает исключение, если данные уже существуют. сжатие - кодек сжатия, используемый при сохранении в файл. Это может быть одно из известных сокращенных имен без учета регистра (none, bzip2, gzip, lz4, snappy и deflate). sep - устанавливает отдельный символ в качестве разделителя для каждого поля и значения. Если None установлен, он использует значение по умолчанию,. цитата - устанавливает единственный символ, используемый для экранирования значений в кавычках, где разделитель может быть частью значения. Если None установлен, он использует значение по умолчанию ". Если вы хотите отключить кавычки, вам нужно установить пустую строку. Escape - устанавливает единственный символ, используемый для экранирования кавычек внутри уже заключенного в кавычки значения. Если None установлен , он использует значение по умолчанию, \ escapeQuotes - флаг, указывающий, должны ли значения, содержащие кавычки, всегда заключаться в кавычки. Если None установлен, он использует значение по умолчанию true, экранируя все значения, содержащие символ кавычки. quoteAll - Флаг, указывающий, должны ли все значения всегда заключаться в кавычки. Если None установлен, он использует значение по умолчанию false, только экранируя значения, содержащие символ кавычки. header - записывает имена столбцов в качестве первой строки. Если None установлен, он использует значение по умолчанию, false. nullValue - устанавливает строковое представление нулевого значения. Если None установлен, он использует значение по умолчанию, пустую строку. dateFormat - устанавливает строку, которая указывает формат даты. Пользовательские форматы даты следуют форматам в java.text.SimpleDateFormat. Это относится к типу даты. Если None установлен, он использует значение по умолчанию, yyyy-MM-dd. timestampFormat - устанавливает строку, которая указывает формат отметки времени. Пользовательские форматы даты следуют форматам в java.text.SimpleDateFormat. Это относится к метке времени. Если значение None установлено, используется значение по умолчанию: yyyy-MM-dd'T'HH: mm: ss.SSSZZ.
источник
источник