Как я могу изменить типы столбцов в DataFrame Spark SQL?

152

Предположим, я делаю что-то вроде:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

Но я действительно хотел yearкак Int(и, возможно, преобразовать некоторые другие столбцы).

Лучшее, что я мог придумать, было

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

что немного запутано.

Я из R, и я привык писать, например,

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

Я, вероятно, что-то упускаю, так как должен быть лучший способ сделать это в Spark / Scala ...

kevinykuo
источник
Мне нравится этот способ spark.sql ("SELECT STRING (NULLIF (column, '')) как column_string")
Эрик Беллет,

Ответы:

141

Редактировать: новейшая версия

С spark 2.x вы можете использовать .withColumn. Проверьте документы здесь:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame

Самый старый ответ

Начиная с версии Spark 1.4 вы можете применить метод cast с DataType к столбцу:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

Если вы используете выражения SQL, вы также можете сделать:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

Для получения дополнительной информации проверьте документы: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

msemelman
источник
4
почему вы использовали withColumn с последующим отбрасыванием? Не проще ли просто использовать withColumn с оригинальным именем столбца?
Амеба Спугноза,
@AmebaSpugnosa Я думаю, что к тому времени, когда я использовал его, Spark потерпел крах, если у него были повторяющиеся имена столбцов. Не когда вы их создаете, а когда вы их используете.
msemelman
5
нет необходимости удалять столбец с последующим переименованием. Вы можете сделать в одну строкуdf.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))
ruhong
1
В этом случае создается целая новая копия фрейма данных только для преобразования столбца? Я что-то упускаю? Или, может быть, есть какая-то оптимизация за кулисами?
user1814008
5
Судя по документации на Spark 2.x, df.withColumn(..)можно добавить или заменить столбец в зависимости от colNameаргумента
Y2K-Shubham
89

[EDIT: март 2016: спасибо за голоса! Хотя на самом деле, это не самый лучший ответ, я думаю , что решения , основанные на withColumn, withColumnRenamedи castвыдвинутую msemelman, Мартин Senne и другие проще и чище].

Я думаю, что ваш подход в порядке, напомним, что Spark DataFrame- это (неизменяемый) RDD Rows, поэтому мы никогда не заменяем столбец, а просто создаем DataFrameкаждый раз новую схему.

Предполагая, что у вас есть оригинальный df со следующей схемой:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

И некоторые UDF определены в одном или нескольких столбцах:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Изменение типов столбцов или даже создание нового DataFrame из другого можно записать так:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

что дает:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

Это довольно близко к вашему собственному решению. Проще говоря, хранение изменений типа и других преобразований в качестве отдельных udf valэлементов делает код более читабельным и пригодным для повторного использования.

Свен
источник
26
Это не безопасно и не эффективно. Не безопасно, потому что одиночная NULLили неправильно сформированная запись приведет к краху всей работы. Не эффективный , потому что UDFs не является прозрачным для катализатора. Использование UDF для сложных операций - это нормально, но нет причин использовать их для приведения типов. Вот почему у нас есть castметод (см. Ответ Мартина Сенне ). Чтобы сделать Catalyst прозрачным, требуется больше работы, но базовая безопасность - это только вопрос «положи Tryи Optionработай».
zero323
Я не видел ничего связанного с преобразованием строки в дату, например "05-APR-2015"
пространство базы данных
3
Есть ли способ сократить ваш withColumn()раздел до общего, который проходит по всем столбцам?
Бурн
Спасибо zero323, прочитав это, я понял, почему здесь происходит сбой решения udf. Некоторые комментарии лучше, чем некоторые ответы на SO :)
Саймон Дирмайер
Есть ли способ, которым мы можем узнать поврежденную строку, означает записи, которые имеют столбцы неправильных типов данных во время приведения. Как функция приведения делает эти поля нулевыми
Etisha
65

Поскольку castоперация доступна для Spark Column(и, как я лично не одобряю udf, как предложено @ Svendна данном этапе), как насчет:

df.select( df("year").cast(IntegerType).as("year"), ... )

привести к запрошенному типу? В качестве аккуратного побочного эффекта станут значения, не подлежащие преобразованию / преобразованию в этом смысле null.

Если вам нужно это как вспомогательный метод , используйте:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

который используется как:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Мартин Сенн
источник
2
Можете ли вы посоветовать мне, как поступить, если мне нужно привести и переименовать целую группу столбцов (у меня есть 50 столбцов, и я довольно новичок в scala, не уверен, каков наилучший способ приблизиться к нему, не создавая массового дублирования)? Некоторые столбцы должны оставаться String, некоторые должны быть преобразованы во Float.
Дмитрий Смирнов
как преобразовать строку в дату, например, «25-APR-2016» в столбце и «20160302»
пространство базы данных
@DmitrySmirnov Вы когда-нибудь получали ответ? У меня такой же вопрос. ;)
Эван Замир
@EvanZamir, к сожалению, нет, в итоге я выполнил несколько операций, чтобы использовать данные как rdd на других этапах. Интересно, стало ли это легче в эти дни :)
Дмитрий Смирнов
60

Во-первых , если вы хотите использовать тип, то это:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

С тем же именем столбца столбец будет заменен новым. Вам не нужно добавлять и удалять шаги.

Во- вторых , о Scala против R .
Это код, который больше всего похож на RI:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Хотя длина кода немного больше, чем у R. Это не имеет ничего общего с многословием языка. В R mutateэто специальная функция для данных R, а в Scala вы можете легко использовать ее благодаря своей выразительной силе.
Словом, он избегает конкретных решений, потому что языковой дизайн достаточно хорош, чтобы вы могли быстро и легко создать свой собственный язык домена.


примечание: df.columnsна удивление Array[String]вместо Array[Column], может быть, они хотят, чтобы он выглядел как датафрейм Python-панд.

Вэйчинг 林 煒 清
источник
1
Не могли бы вы дать эквивалент для pyspark?
Харит Вишвакарма
Я получаю "недопустимое начало определения" .withColumn ("age", $ "age" .cast (sql.types.DoubleType)) для моего поля "age". Любое предложение?
BlueDolphin
Нужно ли использовать .cache () для фрейма данных, если мы выполняем эти преобразования во многих столбцах по соображениям производительности, или это не требуется, поскольку Spark оптимизирует их?
Скьягини
Импортировать можно import org.apache.spark.sql.types._и тогда, а не sql.types.IntegerTypeпросто IntegerType.
nessa.gp
17

Вы можете использовать, selectExprчтобы сделать его немного чище:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
dnlbrky
источник
14

Java-код для изменения типа данных DataFrame с String на Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

Он просто приведёт существующий тип данных String к Integer.

manishbelsare
источник
1
Там нет DataTypesв sql.types! это DataType. Более того, можно просто импортировать IntegerTypeи разыграть.
Эсан М. Кермани
@ EhsanM.Kermani на самом деле DatyaTypes.IntegerType является законной ссылкой.
Купитор
1
@Cupitor DataTypes.IntegerTypeбыл в режиме DeveloperAPI, и он стабилен в v.2.1.0
Ehsan M.
Это лучшее решение!
Саймон Дирмайер
8

Чтобы преобразовать год из строки в int, вы можете добавить следующую опцию в программу чтения csv: "inferSchema" -> "true", см. Документацию DataBricks.

Питер Роуз
источник
5
Это хорошо работает, но
суть в
@beefyhalo абсолютно точно, есть ли способ обойти это?
Ayush
6

Так что это действительно работает, только если у вас возникли проблемы с сохранением в драйвер jdbc, такой как sqlserver, но это действительно полезно для ошибок, с которыми вы столкнетесь с синтаксисом и типами.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
Бен Джарман
источник
Можете ли вы помочь мне реализовать тот же код в Java? и как зарегистрировать customJdbcDialect в DataFrame
abhijitcaps
Приятно, я сделал то же самое с Vertica, но с тех пор, как спарк 2.1. JDbcUtil вам нужно реализовать только тот конкретный тип данных, который вам нужен. dialect.getJDBCType (dt) .orElse (getCommonJDBCType (dt)). getOrElse (выбросить новое IllegalArgumentException (s "Не удается получить тип JDBC для $ {dt.simpleString}"))
Арнон Родман
6

Создайте простой набор данных, содержащий пять значений, и преобразуйте его intв stringтип:

val df = spark.range(5).select( col("id").cast("string") )
user8106134
источник
6

Я думаю, что это намного более читабельно для меня.

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

Это преобразует ваш столбец года в IntegerTypeсоздание временных столбцов и удаление этих столбцов. Если вы хотите преобразовать в любой другой тип данных, вы можете проверить типы внутри org.apache.spark.sql.typesпакета.

Пиюш Патель
источник
5

ответы, предлагающие использовать cast, FYI, метод cast в spark 1.4.1 не работает.

например, кадр данных со строковым столбцом, имеющим значение "8182175552014127960" при приведении к bigint, имеет значение "8182175552014128100"

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

Нам пришлось столкнуться с множеством проблем, прежде чем найти эту ошибку, потому что у нас были колонки bigint в производстве.

sauraI3h
источник
4
psst, обнови свою искру
msemelman
2
@msemelman смешно, чтобы из-за небольшой ошибки обновиться до новой версии spark в производстве.
sauraI3h
разве мы не всегда модернизируем все для маленьких ошибок? :)
Цезарсол
5
df.select($"long_col".cast(IntegerType).as("int_col"))
soulmachine
источник
4

Используя Spark Sql 2.4.0, вы можете сделать это:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Эрик Беллет
источник
3

Вы можете использовать приведенный ниже код.

df.withColumn("year", df("year").cast(IntegerType))

Какой будет конвертировать год столбец в IntegerTypeколонке.

Адарш
источник
2

Этот метод удалит старый столбец и создаст новые столбцы с такими же значениями и новым типом данных. Мои оригинальные типы данных при создании DataFrame были: -

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

После этого я запустил следующий код, чтобы изменить тип данных: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

После этого мой результат оказался:

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
PirateJack
источник
Не могли бы вы предоставить свое решение здесь.
Аджай Хараде
1

Можно изменить тип данных столбца, используя приведение в spark sql. имя таблицы - таблица, и она имеет только два столбца: столбец1 и столбец2 и тип данных столбца1 должны быть изменены. ex-spark.sql («выберите приведение (column1 как Double) column1NewName, column2 из таблицы») Вместо двойного запишите свой тип данных.

Теджасви Шарма
источник
1

Если вам нужно переименовать десятки столбцов, заданных их именами, в следующем примере используется подход @dnlbrky и он применяется к нескольким столбцам одновременно:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

Неклассированные столбцы остаются без изменений. Все столбцы остаются в исходном порядке.

кубический салат
источник
1

Так много ответов, а не так много подробных объяснений.

Следующий синтаксис работает с помощью Блокнота данных в Spark 2.4

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

Обратите внимание, что вы должны указать формат записи, который у вас есть (в моем случае «MM-dd-yyyy»), и импорт является обязательным, поскольку to_date является функцией spark sql.

Также попробовал этот синтаксис, но получил нулевые значения вместо правильного приведения:

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(Обратите внимание, что я должен был использовать скобки и кавычки, чтобы это было синтаксически правильным)


PS: Я должен признать, что это похоже на синтаксические джунгли, есть много возможных путей входа, и в официальных ссылках API отсутствуют надлежащие примеры.

Мехди ЛАМРАНИ
источник
1
Синтаксис джунглей. Да. Это мир Spark прямо сейчас.
conner.xyz
1

Другое решение заключается в следующем:

1) Держите "inferSchema" как ложное

2) Во время выполнения функций «Карта» в строке вы можете прочитать «asString» (row.getString ...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });
Vibha
источник
0
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
Аравинд Кришнакумар
источник
0

По-другому:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
user8106134
источник
0

В случае, если вы хотите изменить несколько столбцов одного типа на другой без указания имен отдельных столбцов

/* Get names of all columns that you want to change type. 
In this example I want to change all columns of type Array to String*/
    val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}

//display

updatedDataFrame.show(truncate = false)
Ravi
источник