Как преобразовать RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) в Dataframe org.apache.spark.sql.DataFrame. Я преобразовал фрейм данных в rdd, используя .rdd. После обработки я хочу вернуть его в фрейм данных. Как я могу это сделать ?...
Как преобразовать RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) в Dataframe org.apache.spark.sql.DataFrame. Я преобразовал фрейм данных в rdd, используя .rdd. После обработки я хочу вернуть его в фрейм данных. Как я могу это сделать ?...
У меня есть DataFrame, сгенерированный следующим образом: df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) Результаты выглядят так: +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0|...
Я хочу добавить столбец в DataFrameс произвольным значением (то же самое для каждой строки). Я получаю ошибку, когда использую withColumnследующее: dt.withColumn('new_column', 10).head(5) --------------------------------------------------------------------------- AttributeError Traceback (most...
Я пробовал, df.orderBy("col1").show(10)но сортировка по возрастанию. df.sort("col1").show(10)также сортирует в порядке убывания. Я посмотрел на stackoverflow, и все ответы, которые я нашел, были устаревшими или относились к RDD . Я хотел бы использовать собственный фрейм данных в...
В чем разница между RDD map и mapPartitionsметодом? И ведет flatMapсебя как mapили нравитсяmapPartitions ? Спасибо. (править) то есть в чем разница (семантически или с точки зрения исполнения) между def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {...
У меня есть Spark DataFrame (с использованием PySpark 1.5.1), и я хотел бы добавить новый столбец. Я безуспешно пробовал следующее: type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col =...
Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель в DataFrames в Scala, но не знаю, как это сделать. Одна из таблиц данных, с которыми я работаю, содержит список транзакций по учетной записи, силимар к следующему примеру. Account Date Type...
Как увеличить объем памяти, доступной для узлов исполнителя Apache Spark? У меня есть файл размером 2 ГБ, который подходит для загрузки в Apache Spark. На данный момент я запускаю apache spark на 1 машине, поэтому драйвер и исполнитель находятся на одной машине. В аппарате 8 ГБ памяти. Когда я...
Я пытаюсь распечатать содержимое коллекции на консоли Spark. У меня тип: linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3] И я использую команду: scala> linesWithSessionId.map(line => println(line)) Но это напечатано: res1: org.apache.spark.rdd.RDD [Unit] = MappedRDD [4]...
Как объединить два столбца в фрейме данных Apache Spark? Есть ли в Spark SQL какая-либо функция, которую мы можем
Это копия чьего-то другого вопроса на другом форуме, на который так и не ответили, поэтому я подумал, что снова задам его здесь, так как у меня такая же проблема. (См. Http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736 ) У меня правильно установлен Spark на моем компьютере, и я могу без...
Я новичок в Spark, и я пытаюсь прочитать данные CSV из файла с помощью Spark. Вот что я делаю: sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() Я ожидал, что этот вызов даст мне список двух первых столбцов моего файла, но я получаю эту ошибку: File...
Я использую https://github.com/databricks/spark-csv , я пытаюсь написать один CSV, но не могу, он создает папку. Нужна функция Scala, которая будет принимать такие параметры, как путь и имя файла, и записывать этот файл CSV....
У меня есть приложение для потоковой передачи искр, которое создает набор данных каждую минуту. Мне нужно сохранить / перезаписать результаты обработанных данных. Когда я пытался перезаписать набор данных, org.apache.hadoop.mapred.FileAlreadyExistsException останавливает выполнение. Я установил...
Я работаю над фреймом данных с двумя столбцами, mvv и count. +---+-----+ |mvv|count| +---+-----+ | 1 | 5 | | 2 | 9 | | 3 | 3 | | 4 | 1 | Я хотел бы получить два списка, содержащие значения mvv и значение счета. Что-то типа mvv = [1,2,3,4] count = [5,9,3,1] Итак, я попробовал следующий код: Первая...
Я пытаюсь отфильтровать фрейм данных PySpark, который имеет Noneзначение строки: df.select('dt_mvmt').distinct().collect() [Row(dt_mvmt=u'2016-03-27'), Row(dt_mvmt=u'2016-03-28'), Row(dt_mvmt=u'2016-03-29'), Row(dt_mvmt=None), Row(dt_mvmt=u'2016-03-30'), Row(dt_mvmt=u'2016-03-31')] и я могу...
У меня есть работающее приложение Spark, где оно занимает все ядра, а другим моим приложениям не будет выделено никаких ресурсов. Я провел небольшое исследование, и люди предложили использовать YARN kill или / bin / spark-class, чтобы убить команду. Однако я использую версию CDH, а / bin /...
Прямо сейчас я должен использовать, df.count > 0чтобы проверить DataFrame, пуст или нет. Но это неэффективно. Есть ли лучший способ сделать это? Спасибо. PS: я хочу проверить, пуст ли он, чтобы я сохранял только, DataFrameесли он не пустой...
У меня есть dataframe со столбцом как String. Я хотел изменить тип столбца на тип Double в PySpark. Я сделал следующее: toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType()) changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show'])) Просто хотел знать, правильный ли это способ...
Я следую отличному руководству по искрам поэтому я пытаюсь загрузить в 46:00:00, README.mdно не могу то, что я делаю, это: $ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4 bash-4.1# ls README.md README.md bash-4.1#...