Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель в DataFrames в Scala, но не знаю, как это сделать.
Одна из таблиц данных, с которыми я работаю, содержит список транзакций по учетной записи, силимар к следующему примеру.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
По крайней мере, первоначально большая часть вычислений будет происходить между транзакциями внутри учетной записи. Поэтому я бы хотел, чтобы данные были разделены, чтобы все транзакции для учетной записи находились в одном разделе Spark.
Но я не вижу способа это определить. В классе DataFrame есть метод под названием repartition (Int), в котором вы можете указать количество создаваемых разделов. Но я не вижу никакого доступного метода для определения настраиваемого разделителя для DataFrame, например, который может быть указан для RDD.
Исходные данные хранятся в Parquet. Я действительно видел, что при записи DataFrame в Parquet вы можете указать столбец для разделения, поэтому, предположительно, я мог бы сказать Parquet, чтобы он разделил его данные по столбцу «Учетная запись». Но могут быть миллионы учетных записей, и если я правильно понимаю Parquet, он создаст отдельный каталог для каждой учетной записи, так что это не звучит как разумное решение.
Есть ли способ заставить Spark разделить этот DataFrame так, чтобы все данные для учетной записи находились в одном разделе?
int(account/someInteger)
записям , вы, вероятно, сможете разделить их по и таким образом получить разумное количество учетных записей на каталог.partitionBy(Partitioner)
Искал эквивалент этого метода, но для DataFrames вместо RDD. Я теперь вижу , чтоpartitionBy
доступно только для парного РДА, не уверено , почему это так.Ответы:
Искра> = 2.3.0
SPARK-22614 предоставляет разделение диапазона.
SPARK-22389 предоставляет разделение по внешнему формату в Data Source API v2 .
Искра> = 1.6.0
В Spark> = 1.6 можно использовать разбиение по столбцам для запросов и кеширования. См: SPARK-11410 и СПАРК-4849 с использованием
repartition
метода:В отличие от
RDDs
SparkDataset
(включаяDataset[Row]
akaDataFrame
), на данный момент нельзя использовать собственный разделитель. Обычно вы можете решить эту проблему, создав столбец искусственного разделения, но это не даст вам такой же гибкости.Spark <1.6.0:
Одна вещь, которую вы можете сделать, - это предварительно разделить входные данные перед тем, как создавать
DataFrame
Поскольку для
DataFrame
создания из файлаRDD
требуется только простая фаза карты, необходимо сохранить существующий макет раздела *:Таким же образом можно переделать существующие
DataFrame
:Так что, похоже, это не невозможно. Остается вопрос, имеет ли это вообще смысл. Я буду утверждать, что в большинстве случаев это не так:
Переразбиение - дорогостоящий процесс. В типичном сценарии большую часть данных необходимо сериализовать, перемешать и десериализовать. С другой стороны, количество операций, для которых могут быть полезны предварительно разделенные данные, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.
GROUP BY
- можно уменьшить объем памяти, занимаемой временными буферами **, но общая стоимость намного выше. Более или менее эквивалентноgroupByKey.mapValues(_.reduce)
(текущее поведение) vsreduceByKey
(предварительное разбиение). Вряд ли пригодится на практике.SqlContext.cacheTable
. Поскольку похоже, что используется кодирование длин серий, применениеOrderedRDDFunctions.repartitionAndSortWithinPartitions
может улучшить степень сжатия.Производительность сильно зависит от распределения ключей. Если он перекошен, это приведет к неоптимальному использованию ресурсов. В худшем случае закончить работу будет невозможно.
Связанные понятия
Разбиение с исходниками JDBC :
predicates
Аргумент поддержки источников данных JDBC . Его можно использовать следующим образом:Он создает один раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не являются непересекающимися, вы увидите дубликаты в итоговой таблице.
partitionBy
метод вDataFrameWriter
:Spark
DataFrameWriter
предоставляетpartitionBy
метод, который можно использовать для «разделения» данных при записи. Он разделяет данные при записи, используя предоставленный набор столбцовЭто позволяет предикату выталкивать вниз при чтении для запросов на основе ключа:
но это не эквивалентно
DataFrame.repartition
. В частности, такие агрегаты, как:по-прежнему потребуется
TungstenExchange
:bucketBy
метод вDataFrameWriter
(Spark> = 2.0):bucketBy
имеет аналогичные приложения,partitionBy
но только для таблиц (saveAsTable
). Информация о сегментировании может использоваться для оптимизации объединений:* Под разметкой разделов я подразумеваю только распределение данных.
partitioned
RDD больше не имеет разделителя. ** Без предварительного прогноза. Если агрегирование охватывает только небольшое подмножество столбцов, вероятно, нет никакого выигрыша.источник
DataFrameWriter.partitionBy
логически не то же самое, чтоDataFrame.repartition
. Бывшее включение не перемешивается, оно просто разделяет вывод. Относительно первого вопроса. - данные сохраняются по разделам и перемешивания нет. Вы можете легко проверить это, прочитав отдельные файлы. Но одна Spark не может узнать об этом, действительно ли вы этого хотите.В Spark <1.6. Если вы создаете a
HiveContext
, а не обычный старый,SqlContext
вы можете использовать HiveQLDISTRIBUTE BY colX...
(гарантирует, что каждый из N редукторов получает неперекрывающиеся диапазоны x) &CLUSTER BY colX...
(ярлык для Распределить по и Сортировать по), например;Не уверен, как это сочетается с Spark DF api. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь хранилище метаданных улья для использования HiveContext)
EDIT: Spark 1.6+ теперь имеет это в собственном API DataFrame
источник
Итак, для начала какой-то ответ :) - Вы не можете
Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd, а в DataFrame нет такой вещи, как Partitioner.
Как правило, идея DataFrame состоит в том, чтобы предоставить другой уровень абстракции, который сам решает такие проблемы. Запросы к DataFrame переводятся в логический план, который в дальнейшем транслируется в операции с RDD. Предложенное вами разбиение, вероятно, будет применено автоматически или, по крайней мере, должно быть.
Если вы не доверяете SparkSQL в том, что он обеспечит какую-то оптимальную работу, вы всегда можете преобразовать DataFrame в RDD [Row], как это предлагается в комментариях.
источник
Используйте DataFrame, возвращаемый:
Нет явного способа использовать
partitionBy
в DataFrame, только в PairRDD, но когда вы сортируете DataFrame, он будет использовать его в своем LogicalPlan, и это поможет, когда вам нужно будет производить вычисления для каждой учетной записи.Я только что наткнулся на ту же самую проблему с фреймом данных, который я хочу разделить по учетной записи. Я предполагаю, что когда вы говорите «хотите разделить данные так, чтобы все транзакции для учетной записи находились в одном разделе Spark», вы хотите, чтобы это было масштабно и производительно, но ваш код не зависит от этого (например, при использовании
mapPartitions()
и т. д.), верно?источник
Я смог сделать это с помощью RDD. Но я не знаю, приемлемо ли для вас это решение. Как только у вас будет DF, доступный в качестве RDD, вы можете подать заявку
repartitionAndSortWithinPartitions
на выполнение настраиваемого перераспределения данных.Вот образец, который я использовал:
источник