Я работаю над фреймом данных с двумя столбцами, mvv и count.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
Я хотел бы получить два списка, содержащие значения mvv и значение счета. Что-то типа
mvv = [1,2,3,4]
count = [5,9,3,1]
Итак, я попробовал следующий код: Первая строка должна возвращать список строк Python. Я хотел увидеть первое значение:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Но я получаю сообщение об ошибке со второй строкой:
AttributeError: getInt
python
apache-spark
pyspark
spark-dataframe
A.Moussa
источник
источник
list(df.select('mvv').toPandas()['mvv'])
. Arrow был интегрирован в PySpark, чтоtoPandas
значительно ускорилось . Не используйте другие подходы, если вы используете Spark 2.3+. См. Мой ответ для получения дополнительных сведений о тестировании.Ответы:
Понимаете, почему то, что вы делаете, не работает. Во-первых, вы пытаетесь получить целое число из типа строки , результат вашей коллекции будет таким:
>>> mvv_list = mvv_count_df.select('mvv').collect() >>> mvv_list[0] Out: Row(mvv=1)
Если взять что-то вроде этого:
>>> firstvalue = mvv_list[0].mvv Out: 1
Вы получите
mvv
ценность. Если вам нужна вся информация о массиве, вы можете взять что-то вроде этого:>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()] >>> mvv_array Out: [1,2,3,4]
Но если вы попробуете то же самое для другого столбца, вы получите:
>>> mvv_count = [int(row.count) for row in mvv_list.collect()] Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'
Это происходит потому, что
count
это встроенный метод. И столбец имеет то же имя, что иcount
. Чтобы решить эту проблему, измените имя столбцаcount
на_count
:>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count") >>> mvv_count = [int(row._count) for row in mvv_list.collect()]
Но это обходное решение не требуется, поскольку вы можете получить доступ к столбцу, используя синтаксис словаря:
>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()] >>> mvv_count = [int(row['count']) for row in mvv_list.collect()]
И наконец-то заработает!
источник
select('count')
использование вот так:count_list = [int(i.count) for i in mvv_list.collect()]
я добавлю пример в ответ.[i.['count'] for i in mvv_list.collect()]
работает, чтобы сделать явным использование столбца с именем 'count', а неcount
функцииПосле одного лайнера вы получите список, который вам нужен.
mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
источник
Это даст вам все элементы в виде списка.
mvv_list = list( mvv_count_df.select('mvv').toPandas()['mvv'] )
источник
Следующий код поможет вам
mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
источник
По моим данным я получил эти тесты:
>>> data.select(col).rdd.flatMap(lambda x: x).collect()
0,52 с
>>> [row[col] for row in data.collect()]
0,271 с
>>> list(data.select(col).toPandas()[col])
0,427 с
Результат тот же
источник
toLocalIterator
вместоcollect
него, должно быть даже больше памяти[row[col] for row in data.toLocalIterator()]
Если вы получите сообщение об ошибке ниже:
Этот код решит ваши проблемы:
mvv_list = mvv_count_df.select('mvv').collect() mvv_array = [int(i.mvv) for i in mvv_list]
источник
Я провел сравнительный анализ, и
list(mvv_count_df.select('mvv').toPandas()['mvv'])
это самый быстрый метод. Я очень удивлен.Я использовал разные подходы для наборов данных со 100 тысячами / 100 миллионами строк, используя 5-узловой кластер i3.xlarge (каждый узел имеет 30,5 ГБ ОЗУ и 4 ядра) с Spark 2.4.5. Данные были равномерно распределены по 20 быстро сжатым файлам Parquet с одним столбцом.
Вот результаты тестирования (время выполнения в секундах):
+-------------------------------------------------------------+---------+-------------+ | Code | 100,000 | 100,000,000 | +-------------------------------------------------------------+---------+-------------+ | df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 | | list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 | | df.select('col_name').rdd.map(lambda row : row[0]).collect()| 0.9 | 69 | | [row[0] for row in df.select('col_name').collect()] | 1.0 | OOM | | [r[0] for r in mid_df.select('col_name').toLocalIterator()] | 1.2 | * | +-------------------------------------------------------------+---------+-------------+ * cancelled after 800 seconds
Золотые правила, которым необходимо следовать при сборе данных на узле драйвера:
toPandas
был значительно улучшен в Spark 2.3 . Вероятно, это не лучший подход, если вы используете версию Spark до 2.3.См. Здесь для получения более подробной информации / результатов тестирования.
источник
Возможное решение - использование
collect_list()
функции изpyspark.sql.functions
. Это объединит все значения столбцов в массив pyspark, который при сборе преобразуется в список Python:mvv_list = df.select(collect_list("mvv")).collect()[0][0] count_list = df.select(collect_list("count")).collect()[0][0]
источник
Давайте создадим рассматриваемый фрейм данных
df_test = spark.createDataFrame( [ (1, 5), (2, 9), (3, 3), (4, 1), ], ['mvv', 'count'] ) df_test.show()
Который дает
+---+-----+ |mvv|count| +---+-----+ | 1| 5| | 2| 9| | 3| 3| | 4| 1| +---+-----+
а затем примените rdd.flatMap (f) .collect (), чтобы получить список
test_list = df_test.select("mvv").rdd.flatMap(list).collect() print(type(test_list)) print(test_list)
который дает
<type 'list'> [1, 2, 3, 4]
источник