Spark Dataframe различает столбцы с повторяющимся именем

83

Итак, как я знаю в Spark Dataframe, несколько столбцов могут иметь то же имя, что и на снимке ниже:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Вышеупомянутый результат создается путем соединения с фреймом данных к самому себе, вы можете видеть, что есть 4столбцы с двумя aи f.

Проблема в том, что когда я пытаюсь выполнить больше вычислений со aстолбцом, я не могу найти способ выбрать a, я попытался, df[0]и df.select('a')оба вернули мне сообщение об ошибке ниже:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Есть ли способ в Spark API, чтобы я снова мог отличить столбцы от повторяющихся имен? или, может быть, как-нибудь разрешить мне изменить имена столбцов?

резек
источник

Ответы:

61

Я бы порекомендовал вам изменить имена столбцов для вашего join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

В результате DataFrameбудетschema

(df1_a, df1_f, df2_a, df2_f)
Гленни Хеллес Синдхольт
источник
5
Возможно, вам придется исправить свой ответ, поскольку кавычки между именами столбцов неправильно настроены.
Самех Шараф
2
@SamehSharaf Я полагаю, что вы проголосовали против моего ответа? Но ответ на самом деле на 100% правильный - я просто использую scala '-shorthand для выбора столбца, поэтому на самом деле проблем с кавычками нет.
Glennie Helles Sindholt
31
@GlennieHellesSindholt, справедливо. Это сбивает с толку, потому что ответ помечен как pythonи pyspark.
Хорхе Лейтао
Что, если каждый фрейм данных содержит более 100 столбцов, и нам просто нужно переименовать одно имя столбца, которое будет таким же? Конечно, нельзя вручную ввести все эти имена столбцов в предложении select
bikashg
6
В этом случае вы могли бы пойти сdf1.withColumnRenamed("a", "df1_a")
Гленни Хеллес Синдхольт
100

Начнем с некоторых данных:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

Есть несколько способов решить эту проблему. Прежде всего, вы можете однозначно ссылаться на столбцы дочерней таблицы, используя родительские столбцы:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Вы также можете использовать псевдонимы таблиц:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Наконец, вы можете программно переименовать столбцы:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
ноль323
источник
7
Спасибо за редактирование за то, что вы показали так много способов получить правильный столбец в этих неоднозначных случаях, я действительно думаю, что ваши примеры должны быть включены в руководство по программированию Spark. Я многому научился!
resc
небольшая поправка: df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns))вместо df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns)). В остальном, хорошие вещи
Vzzarr
Я согласен с тем, что это должно быть частью руководства по программированию Spark. Чистое золото. Я смог наконец распутать источник неоднозначности, выбрав столбцы по старым именам перед объединением. Решение программного добавления суффиксов к именам столбцов перед объединением всей неоднозначности.
Пабло Адамес
26

Есть более простой способ, чем писать псевдонимы для всех столбцов, к которым вы присоединяетесь:

df1.join(df2,['a'])

Это работает, если ключ, к которому вы присоединяетесь, одинаков в обеих таблицах.

См. Https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html.

Пол Бендевис
источник
4
это фактический ответ на Spark 2+
Мэтт
2
А для Scala: df1.join (df2, Seq ("a"))
mauriciojost
1
страница перемещена по адресу
bogdan.rusu
7

Вы можете использовать def drop(col: Column)метод, чтобы удалить дублированный столбец, например:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

когда я присоединяюсь к df1 с df2, DataFrame будет выглядеть следующим образом:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Теперь мы можем использовать def drop(col: Column)метод для удаления дублированного столбца 'a' или 'f', как показано ниже:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
StrongYoung
источник
Будет ли этот подход работать, если вы выполняете внешнее соединение и два столбца имеют разные значения?
prafi
Возможно, вы не захотите отбрасывать разные отношения с одной и той же схемой.
thebluephantom
5

Покопавшись в Spark API, я обнаружил, что могу сначала aliasсоздать псевдоним для исходного фрейма данных, а затем withColumnRenamedвручную переименовать каждый столбец в псевдониме, это joinне приведет к дублированию имени столбца.

Более подробную информацию можно найти ниже в Spark Dataframe API :

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

Однако я думаю, что это лишь неприятный обходной путь, и мне интересно, есть ли лучший способ ответить на мой вопрос.

резек
источник
4

Предположим, что DataFrames, к которым вы хотите присоединиться, - это df1 и df2, и вы присоединяетесь к ним в столбце 'a', тогда у вас есть 2 метода

Способ 1

df1.join (df2, 'a', 'left_outer')

Это отличный метод, и он настоятельно рекомендуется.

Способ 2

df1.join (df2, df1.a == df2.a, 'left_outer'). drop (df2.a)

тайфун
источник
4

Вот как мы можем объединить два Dataframes с одинаковыми именами столбцов в PySpark.

df = df1.join(df2, ['col1','col2','col3'])

Если вы сделаете printSchema()это после этого, вы увидите, что повторяющиеся столбцы были удалены.

Нихил Редидж
источник
1

Возможно, это не лучший подход, но если вы хотите переименовать повторяющиеся столбцы (после соединения), вы можете сделать это с помощью этой крошечной функции.

def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
Акаши
источник
1

если только ключевой столбец одинаков в обеих таблицах, попробуйте использовать следующий способ (подход 1):

left. join(right , 'key', 'inner')

а не ниже (подход 2):

left. join(right , left.key == right.key, 'inner')

Плюсы использования подхода 1:

  • 'ключ' будет отображаться только один раз в окончательном фрейме данных
  • простой в использовании синтаксис

Минусы использования подхода 1:

  • только помощь с ключевой колонкой
  • Сценарии, в которых используется левое соединение, если вы планируете использовать нулевой счетчик правого ключа, это не сработает. В этом случае необходимо переименовать один из ключей, как указано выше.
Маниш Сингла
источник
0

Если у вас более сложный вариант использования, чем описано в ответе Гленни Хеллеса Синдхольта, например, у вас есть другие / несколько имен столбцов без соединения, которые также совпадают и вы хотите различать их при выборе, лучше использовать псевдонимы, например:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']
Вассерманн
источник