Вы не можете добавить произвольный столбец DataFrame
в Spark. Новые столбцы можно создавать только с помощью литералов (другие типы литералов описаны в разделе Как добавить постоянный столбец в Spark DataFrame? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
преобразование существующего столбца:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
включены с использованием join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
или создается функцией / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
С точки зрения производительности встроенные функции ( pyspark.sql.functions
), которые сопоставляются с выражением Catalyst, обычно предпочтительнее, чем пользовательские функции Python.
Если вы хотите добавить содержимое произвольного RDD в виде столбца, вы можете
Чтобы добавить столбец с помощью UDF:
источник
Для Spark 2.0
источник
df = df.select('*', (df.age + 10).alias('agePlusTen'))
вы фактически добавите произвольный столбец, поскольку @ zero323 предупреждал нас, что выше было невозможно, если только что-то не так с этим в Spark, в Pandas это стандартный способ ..df.select('*', df.age + 10, df.age + 20)
Есть несколько способов добавить новый столбец в pySpark.
Давайте сначала создадим простой DataFrame.
Теперь попробуем удвоить значение столбца и сохранить его в новом столбце. PFB несколько разных подходов, чтобы добиться того же.
Дополнительные примеры и объяснения по функциям Spark DataFrame вы можете найти в моем блоге .
Надеюсь, это поможет.
источник
Вы можете определить новое
udf
при добавленииcolumn_name
:источник
источник
StringType()
.Я хотел бы предложить обобщенный пример очень похожего варианта использования:
Пример использования: у меня есть CSV, состоящий из:
Мне нужно выполнить некоторые преобразования, и окончательный CSV должен выглядеть как
Мне нужно это сделать, потому что это схема, определенная какой-то моделью, и мне нужно, чтобы мои окончательные данные были совместимы с SQL Bulk Inserts и другими подобными вещами.
так:
1) Я прочитал исходный csv с помощью spark.read и назвал его «df».
2) Я что-то делаю с данными.
3) Я добавляю пустые столбцы с помощью этого скрипта:
Таким образом, вы можете структурировать свою схему после загрузки csv (также будет работать для переупорядочения столбцов, если вам нужно сделать это для многих таблиц).
источник
Самый простой способ добавить столбец - использовать withColumn. Поскольку фрейм данных создается с использованием sqlContext, вы должны указать схему или по умолчанию он может быть доступен в наборе данных. Если схема указана, рабочая нагрузка становится утомительной при каждом изменении.
Ниже приведен пример, который вы можете рассмотреть:
источник
Мы можем добавить дополнительные столбцы в DataFrame напрямую, выполнив следующие шаги:
источник