Обновить
Ответ на этот вопрос остается в силе и информативный, хотя вещи теперь лучше , так как 2.2 / 2.3, который добавляет встроенную поддержку энкодера для Set
, Seq
, Map
, Date
, Timestamp
, и BigDecimal
. Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, у вас должно получиться только неявное в SQLImplicits
.
К сожалению, практически ничего не было добавлено, чтобы помочь с этим. Поиск @since 2.0.0
в Encoders.scala
или SQLImplicits.scala
находит вещи, в основном связанные с примитивными типами (и некоторую настройку case-классов). Итак, первое, что нужно сказать: в настоящее время нет действительно хорошей поддержки пользовательских кодировщиков классов . После всего этого последуют некоторые уловки, которые делают такую работу, на которую мы можем надеяться, учитывая то, что мы имеем в нашем распоряжении. Как предварительный отказ от ответственности: это не будет работать идеально, и я сделаю все возможное, чтобы все ограничения были ясными и предварительными.
В чем именно проблема
Если вы хотите создать набор данных, Spark "требуется кодировщик (для преобразования объекта JVM типа T во внутреннее представление Spark SQL и из него), который обычно создается автоматически с помощью имплицитов из SparkSession
или может быть создан явно путем вызова статических методов. на Encoders
"(взято из документов наcreateDataset
). Кодировщик примет форму, в Encoder[T]
которой T
указан тип, который вы кодируете. Первое предложение заключается в добавлении import spark.implicits._
(которое дает вам эти неявные кодировщики), а второе предложение заключается в явной передаче неявного кодировщика с использованием этого набора функций, связанных с кодировщиком.
Для обычных классов кодер недоступен, поэтому
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
даст вам следующую неявную связанную ошибку времени компиляции:
Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы продуктов (классы дел) поддерживаются путем импорта sqlContext.implicits._ Поддержка сериализации других типов будет добавлена в будущих выпусках.
Однако, если вы переносите любой тип, который вы только что использовали, чтобы получить вышеуказанную ошибку в каком-то расширяемом классе Product
, ошибка запутанно откладывается до времени выполнения, поэтому
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Компилируется просто отлично, но не работает во время выполнения с
java.lang.UnsupportedOperationException: не найден кодировщик для MyObj
Причина этого заключается в том, что кодировщики, которые Spark создает с импликациями, фактически создаются только во время выполнения (с помощью scala relfection). В этом случае все проверки Spark во время компиляции Product
заключаются в том, что внешний класс расширяется (что делают все классы case) и понимает только во время выполнения, что он все еще не знает, что делать MyObj
(та же проблема возникает, если я попытался сделать a Dataset[(Int,MyObj)]
- Spark ждет, пока не закончится время выполнения MyObj
) Это основные проблемы, которые крайне необходимо исправить:
- некоторые классы, которые расширяют
Product
компиляцию, несмотря на то, что всегда терпят крах во время выполнения и
- нет способа передать пользовательские кодировщики для вложенных типов (у меня нет способа передать Spark кодировщик только для того
MyObj
, чтобы он потом знал, как кодировать Wrap[MyObj]
или (Int,MyObj)
).
Просто используйте kryo
Решение, которое все предлагают, это использовать kryo
кодировщик.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Это становится довольно утомительным быстро, хотя. Особенно, если ваш код манипулирует всеми видами наборов данных, объединяет, группирует и т. Д. В конечном итоге вы получаете кучу дополнительных последствий. Итак, почему бы просто не сделать неявное, которое делает все это автоматически?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
И теперь, похоже, я могу сделать почти все, что захочу (приведенный ниже пример не будет работать там, spark-shell
где spark.implicits._
автоматически импортируется)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Или почти. Проблема в том, что использование kryo
приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. Для map
, filter
, foreach
что достаточно, но для таких операций , как join
, Спарк действительно нуждается в них , чтобы быть разделены на столбцы. Осматривая схему для d2
или d3
, вы видите, что есть только один двоичный столбец:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Частичное решение для кортежей
Таким образом, используя магию имплицитов в Scala (подробнее в 6.26.3 Разрешение перегрузки ), я могу создать серию имплицитов, которые будут работать как можно лучше, по крайней мере для кортежей, и будут хорошо работать с существующими имплицитами:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Затем, вооружившись этими последствиями, я могу заставить свой пример работать, хотя и с некоторым переименованием столбцов.
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Я еще не понял, как получить ожидаемые имена кортежей ( _1
, _2
, ...) по умолчанию без переименования их - если кто - то хочет поиграть с этим, это то , где имя "value"
получает введено и это то , где кортеж имена обычно добавляются. Тем не менее, ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Итак, в заключение, это обходной путь:
- позволяет нам получать отдельные столбцы для кортежей (так что мы можем снова присоединиться к кортежам, ура!)
- мы снова можем просто положиться на последствия (так что нет необходимости проходить
kryo
повсюду)
- почти полностью обратно совместим с
import spark.implicits._
(с некоторым переименованием)
- никак не соединит на
kyro
сериализовать двоичные столбцы, не говоря уже о тех , полях могут иметь
- имеет неприятный побочный эффект от переименования некоторых столбцов кортежа в «значение» (при необходимости это можно отменить, конвертировав
.toDF
, указав новые имена столбцов и вернув обратно в набор данных - и имена схем, похоже, сохраняются с помощью объединений где они больше всего нужны).
Частичное решение для классов в целом
Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение для кортежей, описанное выше, я догадываюсь, что решение о неявном преобразовании из другого ответа тоже будет немного менее болезненным, поскольку вы можете преобразовывать свои более сложные классы в кортежи. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход с фреймом данных. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях моих классов. Если бы я только использовал один плоский двоичный kryo
сериализатор, это было бы невозможно.
Вот пример , который делает немного все: у меня есть класс , MyObj
который имеет поле типов Int
, java.util.UUID
и Set[String]
. Первый заботится о себе. Второе, хотя я мог бы использовать сериализацию с использованием, kryo
было бы более полезно, если бы оно хранилось как String
(так UUID
как обычно я хочу присоединиться к s). Третий действительно принадлежит двоичному столбцу.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Теперь я могу создать набор данных с хорошей схемой, используя этот механизм:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
И схема показывает мне столбцы с правильными именами и первыми двумя вещами, с которыми я могу объединиться.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
с использованием сериализации JSON? В моем случае я не могу сойти с рук с кортежами, и Крио дает мне двоичный столбец ..Использование универсальных кодеров.
На данный момент доступно два универсальных кодировщика,
kryo
иjavaSerialization
последний из них явно описан как:Предполагая следующий класс
Вы можете использовать эти кодировщики, добавив неявный кодировщик:
которые можно использовать вместе следующим образом:
Он хранит объекты в виде
binary
столбца, поэтому при преобразованииDataFrame
вы получаете следующую схему:Также возможно кодировать кортежи, используя
kryo
кодировщик для определенного поля:Обратите внимание, что здесь мы не зависим от неявных кодировщиков, но передаем кодировщик явно, так что это скорее всего не будет работать с
toDS
методом.Использование неявных преобразований:
Обеспечьте неявные преобразования между представлением, которое может быть закодировано, и пользовательским классом, например:
Смежные вопросы:
источник
Set
), которые я получаюException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
точно так же, если в классе есть полеBar
, необходим кодировщик для всего объекта. Это очень грубые методы.Bar
вам нужен кодировщик для всего объекта». мой вопрос был, как закодировать этот "весь проект"?Вы можете использовать UDTRegistration, а затем Case Classes, Tuple и т. Д. - все работает корректно с вашим Определяемым пользователем типом!
Скажем, вы хотите использовать пользовательский Enum:
Зарегистрируйте это так:
Тогда используйте его!
Допустим, вы хотите использовать полиморфную запись:
... и использовать это так:
Вы можете написать пользовательский UDT, который кодирует все в байты (здесь я использую сериализацию Java, но, вероятно, лучше использовать инструмент Spark в контексте Kryo).
Сначала определите класс UDT:
Тогда зарегистрируйте это:
Тогда вы можете использовать это!
источник
Кодеры работают более или менее одинаково
Spark2.0
. ИKryo
до сих пор это рекомендуемыйserialization
выбор.Вы можете посмотреть на следующий пример с spark-shell
До сих пор] не было
appropriate encoders
в настоящем объеме, поэтому наши люди не были закодированы какbinary
ценности. Но это изменится, как только мы предоставим некоторыеimplicit
кодеры, использующиеKryo
сериализацию.источник
В случае класса Java Bean это может быть полезно
Теперь вы можете просто прочитать dataFrame как пользовательский DataFrame
Это создаст пользовательский кодер класса, а не двоичный.
источник
Мои примеры будут на Java, но я не думаю, что это будет сложно адаптироваться к Scala.
Я довольно успешно преобразования
RDD<Fruit>
вDataset<Fruit>
использовании spark.createDataset и Encoders.bean до тех пор , какFruit
это простой Java Bean .Шаг 1: Создайте простой Java Bean.
Я бы придерживался классов с примитивными типами и String как полей, прежде чем люди из DataBricks усилили свои кодировщики. Если у вас есть класс с вложенным объектом, создайте еще один простой Java Bean со всеми его полями, чтобы вы могли использовать преобразования RDD для сопоставления сложного типа с более простым. Конечно, это небольшая дополнительная работа, но я думаю, что это сильно поможет производительности при работе с плоской схемой.
Шаг 2: Получите ваш набор данных от RDD
И вуаля! Вспенить, промыть, повторить.
источник
Для тех, кто может в моей ситуации, я тоже выложу свой ответ.
Чтобы быть конкретным,
Я читал «Установить типизированные данные» из SQLContext. Таким образом, оригинальный формат данных - DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Затем преобразуйте его в RDD, используя rdd.map () с типом mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Результат:
(1,Set(1))
источник
В дополнение к уже высказанным предложениям, недавно я обнаружил еще один вариант - вы можете объявить свой пользовательский класс, включая черту
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Это работает, если у класса есть конструктор, который использует типы, которые может понять ExpressionEncoder, то есть примитивные значения и стандартные коллекции. Это может пригодиться, когда вы не можете объявить класс в качестве класса case, но не хотите использовать Kryo для его кодирования каждый раз, когда он включается в набор данных.
Например, я хотел объявить класс case, включающий вектор Breeze. Единственный кодировщик, который мог бы обрабатывать это, как правило, Kryo. Но если бы я объявил подкласс, который расширил Breeze DenseVector и DefinedByConstructorParams, ExpressionEncoder понял, что его можно сериализовать как массив Double.
Вот как я это объявил:
Теперь я могу использовать
SerializableDenseVector
в наборе данных (напрямую или как часть продукта), используя простой ExpressionEncoder и без Kryo. Он работает так же, как Breeze DenseVector, но сериализуется как массив [Double].источник