Как сохранить DataFrame прямо в Hive?

85

Можно ли сохранить DataFrameв Spark прямо в Hive?

Я попытался преобразовать DataFrameв Rddтекстовый файл, а затем сохранить его и загрузить в куст. Но мне интересно, могу ли я напрямую сохранить dataframeв улей

Гурав
источник

Ответы:

118

Вы можете создать временную таблицу в памяти и сохранить их в таблице кустов с помощью sqlContext.

Допустим, ваш фрейм данных - myDf. Вы можете создать одну временную таблицу, используя,

myDf.createOrReplaceTempView("mytempTable") 

Затем вы можете использовать простой оператор hive для создания таблицы и выгрузки данных из вашей временной таблицы.

sqlContext.sql("create table mytable as select * from mytempTable");
Винай Кумар
источник
2
это обошло стороной ошибки чтения паркета, которые я получал при использовании write.saveAsTable в Spark 2.0
ski_squaw
2
Да, однако мы можем использовать разделение по фрейму данных перед созданием временной таблицы. @chhantyal
Vinay Kumar
1
Как вам удалось смешать и сопоставить temporaryстол со hiveстолом? При этом show tablesвключаются только hiveтаблицы для моей spark 2.3.0установки
StephenBoesch
1
эта временная таблица будет сохранена в контексте вашего улья и никоим образом не принадлежит к таблицам улья.
Vinay Kumar
1
привет @VinayKumar, почему вы говорите: «Если вы используете saveAsTable (это больше похоже на сохранение вашего фрейма данных), вы должны убедиться, что у вас достаточно памяти, выделенной для вашего искрового приложения». не могли бы вы объяснить этот момент?
enneppi
28

Используйте DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) См. Руководство по Spark SQL и DataFrame .

Даниэль Дарабос
источник
4
saveAsTable не создает Hive-совместимые таблицы. Лучшее решение, которое я нашел, - это Винай Кумар.
Rhat
@Jacek: Я сам добавил это примечание, потому что считаю, что мой ответ неверен. Я бы удалил его, кроме того, что он принят. Как вы думаете, записка неправильная?
Даниэль Дарабос
Да. Заметка была неправильной, поэтому я удалил ее. «Пожалуйста, поправьте меня, если я ошибаюсь» применимо здесь :)
Яцек Ласковски
1
это df.write().saveAsTable(tableName) также будет записывать потоковые данные в таблицу?
user1870400
1
нет, вы не можете сохранить потоковые данные с помощью saveAsTable, их даже нет в api
Брайан
21

Я не вижу df.write.saveAsTable(...)устаревших в документации Spark 2.0. У нас это сработало на Amazon EMR. Мы прекрасно могли считывать данные из S3 в фрейм данных, обрабатывать их, создавать таблицу из результатов и читать их с помощью MicroStrategy. Ответ Vinays также сработал.

Alex
источник
5
Кто-то отметил этот ответ как некачественный из-за длины и содержания. Если честно, наверное, было бы лучше в качестве комментария. Я предполагаю, что он действовал в течение двух лет, и некоторые люди сочли его полезным, так что может быть хорошо оставить все как есть?
serakfalcon
Я согласен, комментарий был бы лучшим выбором. Урок усвоен :-)
Alex
15

вам нужно иметь / создать HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Затем напрямую сохраните фрейм данных или выберите столбцы для хранения в виде таблицы улья.

df - это фрейм данных

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

или же

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

или же

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes: Append / Ignore / Overwrite / ErrorIfExists.

Я добавил сюда определение HiveContext из документации Spark,

В дополнение к базовому SQLContext вы также можете создать HiveContext, который обеспечивает расширенный набор функций, предоставляемых базовым SQLContext. Дополнительные функции включают возможность писать запросы с использованием более полного анализатора HiveQL, доступ к пользовательским функциям Hive и возможность чтения данных из таблиц Hive. Чтобы использовать HiveContext, вам не нужно иметь существующую настройку Hive, и все источники данных, доступные для SQLContext, по-прежнему доступны. HiveContext упаковывается отдельно, чтобы избежать включения всех зависимостей Hive в сборку Spark по умолчанию.


в Spark версии 1.6.2 использование "dbName.tableName" дает такую ​​ошибку:

org.apache.spark.sql.AnalysisException: указание имени базы данных или других квалификаторов не допускается для временных таблиц. Если в имени таблицы есть точки (.), Укажите имя таблицы с помощью обратных кавычек ().

Анандкумар
источник
Это вторая команда: 'df.select (df.col ("col1"), df.col ("col2"), df.col ("col3")) .write (). Mode ("overwrite"). SaveAsTable ("schemaName.tableName"); ' требовать, чтобы выбранные столбцы, которые вы собираетесь перезаписать, уже существовали в таблице? Итак, у вас есть существующая таблица, и вы только перезаписываете существующие столбцы 1,2,3 новыми данными из вашего df в искре? это правильно истолковано?
dieHellste
3
df.write().mode...необходимо изменить наdf.write.mode...
user 923227
8

Сохранение в Hive - это просто вопрос использования write()метода вашего SQLContext:

df.write.saveAsTable(tableName)

См. Https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

Начиная с Spark 2.2: используйте DataSet вместо DataFrame.

Рактотпал Бордолой
источник
Кажется, у меня есть ошибка, в которой говорится, что задание прервано. Я пробовал следующий код pyspark_df.write.mode ("перезапись"). SaveAsTable ("InjuryTab2")
Sade
Привет! почему это? From Spark 2.2: use DataSet instead DataFrame.
onofricamila 06
3

Извините, что написал поздно, но я не вижу принятого ответа.

df.write().saveAsTableбудет выбрасывать AnalysisExceptionи не совместим со столом HIVE.

Сохранение DF как df.write().format("hive")должно помочь!

Однако, если это не сработает, то, судя по предыдущим комментариям и ответам, на мой взгляд, это лучшее решение (хотя открыто для предложений).

Лучший подход - явно создать таблицу HIVE (включая таблицу PARTITIONED),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

сохранить DF как временную таблицу,

df.createOrReplaceTempView("$tempTableName")

и вставить в таблицу РАЗДЕЛЕННЫЙ УЛЕЙ:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Offcourse последний столбец в DF будет PARTITION COLUMN таким образом , создать Hive таблицы соответственно!

Прокомментируйте, если это работает! или нет.


--ОБНОВИТЬ--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
DrthSprk_
источник
1

Вот версия PySpark для создания таблицы Hive из паркетного файла. Возможно, вы сгенерировали файлы Parquet с использованием предполагаемой схемы и теперь хотите отправить определение в хранилище метаданных Hive. Вы также можете отправить определение в систему, например AWS Glue или AWS Athena, а не только в хранилище метаданных Hive. Здесь я использую spark.sql для создания / создания постоянной таблицы.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
картик
источник
1

Для внешних таблиц Hive я использую эту функцию в PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
Теневой штурмовик
источник
1

В моем случае это нормально работает:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

Готово!!

Вы можете читать данные, позволяя указать как «Сотрудник»

hive.executeQuery("select * from Employee").show()

Для получения дополнительных сведений используйте этот URL-адрес: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

MD Rijwan
источник
0

Вы можете использовать библиотеку искры Hortonworks следующим образом

import com.hortonworks.hwc.HiveWarehouseSession

df.write
  .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
  .mode("append")
  .option("table", "myDatabase.myTable")
  .save()
Майк
источник
-1

Если вы хотите создать таблицу кустов (которой не существует) из фрейма данных (иногда это не удается создать с помощью DataFrameWriter.saveAsTable). StructType.toDDLпоможет перечислить столбцы в виде строки.

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tableбудет создан в пространстве по умолчанию, поскольку мы не предоставили никакой базы данных по адресу spark.sql(). stg.hive_tableможно использовать для создания hive_tableв stgбазе данных.

Mrsrinivas
источник
Подробный пример можно найти здесь: stackoverflow.com/a/56833395/1592191
mrsrinivas