Как хранить пользовательские объекты в наборе данных?

149

В соответствии с введением наборов данных Spark :

В преддверии Spark 2.0 мы планируем несколько интересных улучшений в наборах данных, в частности: ... Пользовательские кодировщики - в то время как в настоящее время мы автоматически генерируем кодировщики для широкого спектра типов, мы хотели бы открыть API для пользовательских объектов.

и пытается сохранить пользовательский тип в приведении Datasetк следующей ошибке, такой как:

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы продуктов (классы дел) поддерживаются путем импорта sqlContext.implicits._ Поддержка сериализации других типов будет добавлена ​​в будущих выпусках.

или:

Java.lang.UnsupportedOperationException: не найден кодировщик для ....

Существуют ли обходные пути?


Обратите внимание, что этот вопрос существует только в качестве отправной точки для ответа сообщества Wiki. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.

zero323
источник

Ответы:

240

Обновить

Ответ на этот вопрос остается в силе и информативный, хотя вещи теперь лучше , так как 2.2 / 2.3, который добавляет встроенную поддержку энкодера для Set, Seq, Map, Date, Timestamp, и BigDecimal. Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, у вас должно получиться только неявное в SQLImplicits.


К сожалению, практически ничего не было добавлено, чтобы помочь с этим. Поиск @since 2.0.0в Encoders.scalaили SQLImplicits.scalaнаходит вещи, в основном связанные с примитивными типами (и некоторую настройку case-классов). Итак, первое, что нужно сказать: в настоящее время нет действительно хорошей поддержки пользовательских кодировщиков классов . После всего этого последуют некоторые уловки, которые делают такую ​​работу, на которую мы можем надеяться, учитывая то, что мы имеем в нашем распоряжении. Как предварительный отказ от ответственности: это не будет работать идеально, и я сделаю все возможное, чтобы все ограничения были ясными и предварительными.

В чем именно проблема

Если вы хотите создать набор данных, Spark "требуется кодировщик (для преобразования объекта JVM типа T во внутреннее представление Spark SQL и из него), который обычно создается автоматически с помощью имплицитов из SparkSessionили может быть создан явно путем вызова статических методов. на Encoders"(взято из документов наcreateDataset ). Кодировщик примет форму, в Encoder[T]которой Tуказан тип, который вы кодируете. Первое предложение заключается в добавлении import spark.implicits._(которое дает вам эти неявные кодировщики), а второе предложение заключается в явной передаче неявного кодировщика с использованием этого набора функций, связанных с кодировщиком.

Для обычных классов кодер недоступен, поэтому

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

даст вам следующую неявную связанную ошибку времени компиляции:

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы продуктов (классы дел) поддерживаются путем импорта sqlContext.implicits._ Поддержка сериализации других типов будет добавлена ​​в будущих выпусках.

Однако, если вы переносите любой тип, который вы только что использовали, чтобы получить вышеуказанную ошибку в каком-то расширяемом классе Product, ошибка запутанно откладывается до времени выполнения, поэтому

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Компилируется просто отлично, но не работает во время выполнения с

java.lang.UnsupportedOperationException: не найден кодировщик для MyObj

Причина этого заключается в том, что кодировщики, которые Spark создает с импликациями, фактически создаются только во время выполнения (с помощью scala relfection). В этом случае все проверки Spark во время компиляции Productзаключаются в том, что внешний класс расширяется (что делают все классы case) и понимает только во время выполнения, что он все еще не знает, что делать MyObj(та же проблема возникает, если я попытался сделать a Dataset[(Int,MyObj)]- Spark ждет, пока не закончится время выполнения MyObj) Это основные проблемы, которые крайне необходимо исправить:

  • некоторые классы, которые расширяют Productкомпиляцию, несмотря на то, что всегда терпят крах во время выполнения и
  • нет способа передать пользовательские кодировщики для вложенных типов (у меня нет способа передать Spark кодировщик только для того MyObj, чтобы он потом знал, как кодировать Wrap[MyObj]или (Int,MyObj)).

Просто используйте kryo

Решение, которое все предлагают, это использовать kryoкодировщик.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Это становится довольно утомительным быстро, хотя. Особенно, если ваш код манипулирует всеми видами наборов данных, объединяет, группирует и т. Д. В конечном итоге вы получаете кучу дополнительных последствий. Итак, почему бы просто не сделать неявное, которое делает все это автоматически?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

И теперь, похоже, я могу сделать почти все, что захочу (приведенный ниже пример не будет работать там, spark-shellгде spark.implicits._автоматически импортируется)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Или почти. Проблема в том, что использование kryoприводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. Для map, filter, foreachчто достаточно, но для таких операций , как join, Спарк действительно нуждается в них , чтобы быть разделены на столбцы. Осматривая схему для d2или d3, вы видите, что есть только один двоичный столбец:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Частичное решение для кортежей

Таким образом, используя магию имплицитов в Scala (подробнее в 6.26.3 Разрешение перегрузки ), я могу создать серию имплицитов, которые будут работать как можно лучше, по крайней мере для кортежей, и будут хорошо работать с существующими имплицитами:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Затем, вооружившись этими последствиями, я могу заставить свой пример работать, хотя и с некоторым переименованием столбцов.

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Я еще не понял, как получить ожидаемые имена кортежей ( _1, _2, ...) по умолчанию без переименования их - если кто - то хочет поиграть с этим, это то , где имя "value"получает введено и это то , где кортеж имена обычно добавляются. Тем не менее, ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Итак, в заключение, это обходной путь:

  • позволяет нам получать отдельные столбцы для кортежей (так что мы можем снова присоединиться к кортежам, ура!)
  • мы снова можем просто положиться на последствия (так что нет необходимости проходить kryoповсюду)
  • почти полностью обратно совместим с import spark.implicits._(с некоторым переименованием)
  • никак не соединит на kyroсериализовать двоичные столбцы, не говоря уже о тех , полях могут иметь
  • имеет неприятный побочный эффект от переименования некоторых столбцов кортежа в «значение» (при необходимости это можно отменить, конвертировав .toDF, указав новые имена столбцов и вернув обратно в набор данных - и имена схем, похоже, сохраняются с помощью объединений где они больше всего нужны).

Частичное решение для классов в целом

Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение для кортежей, описанное выше, я догадываюсь, что решение о неявном преобразовании из другого ответа тоже будет немного менее болезненным, поскольку вы можете преобразовывать свои более сложные классы в кортежи. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход с фреймом данных. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях моих классов. Если бы я только использовал один плоский двоичный kryoсериализатор, это было бы невозможно.

Вот пример , который делает немного все: у меня есть класс , MyObjкоторый имеет поле типов Int, java.util.UUIDи Set[String]. Первый заботится о себе. Второе, хотя я мог бы использовать сериализацию с использованием, kryoбыло бы более полезно, если бы оно хранилось как String(так UUIDкак обычно я хочу присоединиться к s). Третий действительно принадлежит двоичному столбцу.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Теперь я могу создать набор данных с хорошей схемой, используя этот механизм:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

И схема показывает мне столбцы с правильными именами и первыми двумя вещами, с которыми я могу объединиться.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
Алек
источник
Можно ли создать собственный класс ExpressionEncoderс использованием сериализации JSON? В моем случае я не могу сойти с рук с кортежами, и Крио дает мне двоичный столбец ..
Алексей Святковский
1
@ AlexeyS Я так не думаю. Но зачем тебе это? Почему вы не можете оставить без внимания последнее решение, которое я предлагаю? Если вы можете поместить свои данные в JSON, вы сможете извлечь поля и поместить их в класс кейсов ...
Алек
1
К сожалению, суть этого ответа в том, что не существует решения, которое работает.
Баол
@baol Вроде. Но помните, как сложно то, что делает Спарк. Система типов Scala просто недостаточно мощна, чтобы «выводить» кодеры, которые рекурсивно проходят через поля. Честно говоря, я просто удивлен, что никто не сделал макрос для аннотации для этого. Похоже, естественное (но сложное) решение.
Alec
1
@combinatorist Насколько я понимаю, наборы данных и кадры данных (но не RDD, поскольку им не нужны кодеры!) эквивалентны с точки зрения производительности. Не стоит недооценивать безопасность типов данных! Тот факт, что Spark внутренне использует массу отражений, приведений и т. Д., Не означает, что вам не следует заботиться о безопасности типов интерфейса, который открыт. Но это заставляет меня чувствовать себя лучше, когда я создаю свои собственные безопасные от типов функции, основанные на наборе данных, которые используют внутренние рамки данных.
Алек
32
  1. Использование универсальных кодеров.

    На данный момент доступно два универсальных кодировщика, kryoи javaSerializationпоследний из них явно описан как:

    крайне неэффективно и должно использоваться только в качестве крайней меры.

    Предполагая следующий класс

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }

    Вы можете использовать эти кодировщики, добавив неявный кодировщик:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }

    которые можно использовать вместе следующим образом:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }

    Он хранит объекты в виде binaryстолбца, поэтому при преобразовании DataFrameвы получаете следующую схему:

    root
     |-- value: binary (nullable = true)

    Также возможно кодировать кортежи, используя kryoкодировщик для определенного поля:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]

    Обратите внимание, что здесь мы не зависим от неявных кодировщиков, но передаем кодировщик явно, так что это скорее всего не будет работать с toDSметодом.

  2. Использование неявных преобразований:

    Обеспечьте неявные преобразования между представлением, которое может быть закодировано, и пользовательским классом, например:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Смежные вопросы:

оборота ноль323
источник
Решение 1, кажется, не работает для типизированных коллекций (по крайней мере Set), которые я получаю Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar].
Виктор П.
@VictorP. Боюсь, что в этом случае вам понадобится кодировщик для определенного типа ( kryo[Set[Bar]]точно так же, если в классе есть поле Bar, необходим кодировщик для всего объекта. Это очень грубые методы.
zero323
@ zero323 У меня такая же проблема. Можете ли вы привести пример кода, как кодировать весь проект? Большое спасибо!
Рок
@Rock Я не уверен, что вы подразумеваете под "целым проектом"
ноль323
@ zero323 за ваш комментарий: «если класс содержит поле, Barвам нужен кодировщик для всего объекта». мой вопрос был, как закодировать этот "весь проект"?
Рок
9

Вы можете использовать UDTRegistration, а затем Case Classes, Tuple и т. Д. - все работает корректно с вашим Определяемым пользователем типом!

Скажем, вы хотите использовать пользовательский Enum:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Зарегистрируйте это так:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Тогда используйте его!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Допустим, вы хотите использовать полиморфную запись:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... и использовать это так:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Вы можете написать пользовательский UDT, который кодирует все в байты (здесь я использую сериализацию Java, но, вероятно, лучше использовать инструмент Spark в контексте Kryo).

Сначала определите класс UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Тогда зарегистрируйте это:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Тогда вы можете использовать это!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
ChoppyTheLumberjack
источник
1
Я не вижу, где используется ваш крио (в CustomPolyUDT)
mathieu
Я пытаюсь определить UDT в моем проекте, и я получаю эту ошибку "Symbol UserDefinedType недоступен с этого места". Любая помощь ?
Риджо Джозеф
Привет @RijoJoseph. Вам нужно сделать пакет org.apache.spark в вашем проекте и поместить в него свой код UDT.
ChoppyTheLumberjack
6

Кодеры работают более или менее одинаково Spark2.0. И Kryoдо сих пор это рекомендуемый serializationвыбор.

Вы можете посмотреть на следующий пример с spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

До сих пор] не было appropriate encodersв настоящем объеме, поэтому наши люди не были закодированы как binaryценности. Но это изменится, как только мы предоставим некоторые implicitкодеры, использующие Kryoсериализацию.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
sarveshseri
источник
3

В случае класса Java Bean это может быть полезно

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Теперь вы можете просто прочитать dataFrame как пользовательский DataFrame

dataFrame.as[MyClass]

Это создаст пользовательский кодер класса, а не двоичный.

Акаш Махаджан
источник
1

Мои примеры будут на Java, но я не думаю, что это будет сложно адаптироваться к Scala.

Я довольно успешно преобразования RDD<Fruit>в Dataset<Fruit>использовании spark.createDataset и Encoders.bean до тех пор , как Fruitэто простой Java Bean .

Шаг 1: Создайте простой Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Я бы придерживался классов с примитивными типами и String как полей, прежде чем люди из DataBricks усилили свои кодировщики. Если у вас есть класс с вложенным объектом, создайте еще один простой Java Bean со всеми его полями, чтобы вы могли использовать преобразования RDD для сопоставления сложного типа с более простым. Конечно, это небольшая дополнительная работа, но я думаю, что это сильно поможет производительности при работе с плоской схемой.

Шаг 2: Получите ваш набор данных от RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

И вуаля! Вспенить, промыть, повторить.

Джимми Да
источник
Я бы предложил указать, что для простых структур вам лучше хранить их в собственных типах Spark, а не сериализовать их в BLOB-объекты. Они лучше работают через шлюз Python, более прозрачны в Parquet и даже могут быть отлиты к конструкциям одинаковой формы.
metasim
1

Для тех, кто может в моей ситуации, я тоже выложу свой ответ.

Чтобы быть конкретным,

  1. Я читал «Установить типизированные данные» из SQLContext. Таким образом, оригинальный формат данных - DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Затем преобразуйте его в RDD, используя rdd.map () с типом mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Результат:

    (1,Set(1))

Taeheon Kwon
источник
0

В дополнение к уже высказанным предложениям, недавно я обнаружил еще один вариант - вы можете объявить свой пользовательский класс, включая черту org.apache.spark.sql.catalyst.DefinedByConstructorParams.

Это работает, если у класса есть конструктор, который использует типы, которые может понять ExpressionEncoder, то есть примитивные значения и стандартные коллекции. Это может пригодиться, когда вы не можете объявить класс в качестве класса case, но не хотите использовать Kryo для его кодирования каждый раз, когда он включается в набор данных.

Например, я хотел объявить класс case, включающий вектор Breeze. Единственный кодировщик, который мог бы обрабатывать это, как правило, Kryo. Но если бы я объявил подкласс, который расширил Breeze DenseVector и DefinedByConstructorParams, ExpressionEncoder понял, что его можно сериализовать как массив Double.

Вот как я это объявил:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Теперь я могу использовать SerializableDenseVectorв наборе данных (напрямую или как часть продукта), используя простой ExpressionEncoder и без Kryo. Он работает так же, как Breeze DenseVector, но сериализуется как массив [Double].

Matt
источник