Как определить разбиение DataFrame?

129

Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель в DataFrames в Scala, но не знаю, как это сделать.

Одна из таблиц данных, с которыми я работаю, содержит список транзакций по учетной записи, силимар к следующему примеру.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

По крайней мере, первоначально большая часть вычислений будет происходить между транзакциями внутри учетной записи. Поэтому я бы хотел, чтобы данные были разделены, чтобы все транзакции для учетной записи находились в одном разделе Spark.

Но я не вижу способа это определить. В классе DataFrame есть метод под названием repartition (Int), в котором вы можете указать количество создаваемых разделов. Но я не вижу никакого доступного метода для определения настраиваемого разделителя для DataFrame, например, который может быть указан для RDD.

Исходные данные хранятся в Parquet. Я действительно видел, что при записи DataFrame в Parquet вы можете указать столбец для разделения, поэтому, предположительно, я мог бы сказать Parquet, чтобы он разделил его данные по столбцу «Учетная запись». Но могут быть миллионы учетных записей, и если я правильно понимаю Parquet, он создаст отдельный каталог для каждой учетной записи, так что это не звучит как разумное решение.

Есть ли способ заставить Spark разделить этот DataFrame так, чтобы все данные для учетной записи находились в одном разделе?

грабли
источник
проверьте эту ссылку stackoverflow.com/questions/23127329/…
Abhishek Choudhary
Если вы можете указать Parquet на разделение по учетным int(account/someInteger)записям , вы, вероятно, сможете разделить их по и таким образом получить разумное количество учетных записей на каталог.
Пол
1
@ABC: Я видел эту ссылку. partitionBy(Partitioner)Искал эквивалент этого метода, но для DataFrames вместо RDD. Я теперь вижу , что partitionByдоступно только для парного РДА, не уверено , почему это так.
rake
@ Пол: Я подумал о том, что ты описываешь. Несколько вещей меня сдерживали:
rake
продолжение .... (1) То есть "Паркет-перегородка". Мне не удалось найти никаких документов, в которых говорилось бы, что Spark-partitioning действительно будет использовать Parquet-partitioning. (2) Если я понимаю документацию Parquet, мне нужно определить новое поле «foo», тогда каждая директория Parquet будет иметь имя вроде «foo = 123». Но если я построю запрос, включающий AccountID , как Spark / hive / parquet узнает, что существует какая-либо связь между foo и AccountID ?
рейк

Ответы:

177

Искра> = 2.3.0

SPARK-22614 предоставляет разделение диапазона.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 предоставляет разделение по внешнему формату в Data Source API v2 .

Искра> = 1.6.0

В Spark> = 1.6 можно использовать разбиение по столбцам для запросов и кеширования. См: SPARK-11410 и СПАРК-4849 с использованием repartitionметода:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

В отличие от RDDsSpark Dataset(включая Dataset[Row]aka DataFrame), на данный момент нельзя использовать собственный разделитель. Обычно вы можете решить эту проблему, создав столбец искусственного разделения, но это не даст вам такой же гибкости.

Spark <1.6.0:

Одна вещь, которую вы можете сделать, - это предварительно разделить входные данные перед тем, как создавать DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Поскольку для DataFrameсоздания из файла RDDтребуется только простая фаза карты, необходимо сохранить существующий макет раздела *:

assert(df.rdd.partitions == partitioned.partitions)

Таким же образом можно переделать существующие DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Так что, похоже, это не невозможно. Остается вопрос, имеет ли это вообще смысл. Я буду утверждать, что в большинстве случаев это не так:

  1. Переразбиение - дорогостоящий процесс. В типичном сценарии большую часть данных необходимо сериализовать, перемешать и десериализовать. С другой стороны, количество операций, для которых могут быть полезны предварительно разделенные данные, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.

    • присоединяется в некоторых сценариях, но для этого потребуется внутренняя поддержка,
    • вызовы оконных функций с соответствующим разделителем. То же, что и выше, только для определения одного окна. Однако он уже разделен внутри, поэтому предварительное разделение может быть избыточным,
    • простое агрегирование с помощью GROUP BY- можно уменьшить объем памяти, занимаемой временными буферами **, но общая стоимость намного выше. Более или менее эквивалентно groupByKey.mapValues(_.reduce)(текущее поведение) vs reduceByKey(предварительное разбиение). Вряд ли пригодится на практике.
    • сжатие данных с помощью SqlContext.cacheTable. Поскольку похоже, что используется кодирование длин серий, применение OrderedRDDFunctions.repartitionAndSortWithinPartitionsможет улучшить степень сжатия.
  2. Производительность сильно зависит от распределения ключей. Если он перекошен, это приведет к неоптимальному использованию ресурсов. В худшем случае закончить работу будет невозможно.

  3. Весь смысл использования декларативного API высокого уровня - изолировать себя от деталей реализации низкого уровня. Как уже упоминалось @dwysakowicz и @RomiKuntsman , оптимизация - это работа Catalyst Optimizer . Это довольно сложный зверь, и я действительно сомневаюсь, что вы сможете легко улучшить его, не углубляясь в его внутренности.

Связанные понятия

Разбиение с исходниками JDBC :

predicatesАргумент поддержки источников данных JDBC . Его можно использовать следующим образом:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Он создает один раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не являются непересекающимися, вы увидите дубликаты в итоговой таблице.

partitionByметод вDataFrameWriter :

Spark DataFrameWriterпредоставляет partitionByметод, который можно использовать для «разделения» данных при записи. Он разделяет данные при записи, используя предоставленный набор столбцов

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Это позволяет предикату выталкивать вниз при чтении для запросов на основе ключа:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

но это не эквивалентно DataFrame.repartition. В частности, такие агрегаты, как:

val cnts = df1.groupBy($"k").sum()

по-прежнему потребуется TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketByметод вDataFrameWriter (Spark> = 2.0):

bucketByимеет аналогичные приложения, partitionByно только для таблиц ( saveAsTable). Информация о сегментировании может использоваться для оптимизации объединений:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Под разметкой разделов я подразумеваю только распределение данных. partitionedRDD больше не имеет разделителя. ** Без предварительного прогноза. Если агрегирование охватывает только небольшое подмножество столбцов, вероятно, нет никакого выигрыша.

zero323
источник
@bychance Да и нет. Макет данных будет сохранен, но, AFAIK, он не даст вам таких преимуществ, как обрезка разделов.
zero323
@ zero323 Спасибо, есть ли способ проверить распределение разделов файла паркета, чтобы убедиться, что df.save.write действительно сохраняет макет? И если я сделаю df.repartition ("A"), затем сделаю df.write.repartitionBy ("B"), физическая структура папок будет разделена на B, и в каждой папке значений B будет ли он по-прежнему сохранять раздел A?
bychance
2
@bychance DataFrameWriter.partitionByлогически не то же самое, что DataFrame.repartition. Бывшее включение не перемешивается, оно просто разделяет вывод. Относительно первого вопроса. - данные сохраняются по разделам и перемешивания нет. Вы можете легко проверить это, прочитав отдельные файлы. Но одна Spark не может узнать об этом, действительно ли вы этого хотите.
zero323
11

В Spark <1.6. Если вы создаете a HiveContext, а не обычный старый, SqlContextвы можете использовать HiveQL DISTRIBUTE BY colX... (гарантирует, что каждый из N редукторов получает неперекрывающиеся диапазоны x) & CLUSTER BY colX...(ярлык для Распределить по и Сортировать по), например;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Не уверен, как это сочетается с Spark DF api. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь хранилище метаданных улья для использования HiveContext)

EDIT: Spark 1.6+ теперь имеет это в собственном API DataFrame

Ночной волк
источник
1
Сохраняются ли разделы при сохранении фрейма данных?
Sim
как вы контролируете, сколько разделов вы можете иметь в примере hive ql? например, в подходе парного RDD вы можете сделать это для создания 5 разделов: val partitioner = new HashPartitioner (5)
Минни
хорошо, нашел ответ, это можно сделать так: sqlContext.setConf ("spark.sql.shuffle.partitions", "5") Я не смог отредактировать предыдущий комментарий, так как пропустил ограничение в 5 минут
Минни
7

Итак, для начала какой-то ответ :) - Вы не можете

Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd, а в DataFrame нет такой вещи, как Partitioner.

Как правило, идея DataFrame состоит в том, чтобы предоставить другой уровень абстракции, который сам решает такие проблемы. Запросы к DataFrame переводятся в логический план, который в дальнейшем транслируется в операции с RDD. Предложенное вами разбиение, вероятно, будет применено автоматически или, по крайней мере, должно быть.

Если вы не доверяете SparkSQL в том, что он обеспечит какую-то оптимальную работу, вы всегда можете преобразовать DataFrame в RDD [Row], как это предлагается в комментариях.

Давид Высакович
источник
7

Используйте DataFrame, возвращаемый:

yourDF.orderBy(account)

Нет явного способа использовать partitionBy в DataFrame, только в PairRDD, но когда вы сортируете DataFrame, он будет использовать его в своем LogicalPlan, и это поможет, когда вам нужно будет производить вычисления для каждой учетной записи.

Я только что наткнулся на ту же самую проблему с фреймом данных, который я хочу разделить по учетной записи. Я предполагаю, что когда вы говорите «хотите разделить данные так, чтобы все транзакции для учетной записи находились в одном разделе Spark», вы хотите, чтобы это было масштабно и производительно, но ваш код не зависит от этого (например, при использовании mapPartitions()и т. д.), верно?

Роми Кунцман
источник
3
Как насчет того, зависит ли ваш код от этого, потому что вы используете mapPartitions?
NightWolf
2
Вы можете преобразовать DataFrame в RDD, а затем разделить его (например, используя aggregatByKey () и передать настраиваемый Partitioner)
Роми Кунцман,
5

Я смог сделать это с помощью RDD. Но я не знаю, приемлемо ли для вас это решение. Как только у вас будет DF, доступный в качестве RDD, вы можете подать заявкуrepartitionAndSortWithinPartitions на выполнение настраиваемого перераспределения данных.

Вот образец, который я использовал:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
разработчик
источник