Странное поведение при вызове функции вне замыкания:
- когда функция находится в объекте, все работает
- когда функция находится в классе get:
Задача не сериализуема: java.io.NotSerializableException: тестирование
Проблема в том, что мне нужен мой код в классе, а не объект. Есть идеи, почему это происходит? Сериализуется ли объект Scala (по умолчанию?)?
Это пример рабочего кода:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Это нерабочий пример:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
источник
источник
Ответы:
СДР расширяют интерфейс Serialisable , так что это не то, что приводит к сбою вашей задачи. Теперь это не значит, что вы можете сериализовать
RDD
с Spark и избежатьNotSerializableException
Spark - это механизм распределенных вычислений, и его основной абстракцией является устойчивый распределенный набор данных ( RDD ), который можно рассматривать как распределенную коллекцию. По сути, элементы RDD распределены по узлам кластера, но Spark абстрагирует это от пользователя, позволяя пользователю взаимодействовать с RDD (коллекцией), как если бы он был локальным.
Чтобы не попасть в слишком много деталей, но при запуске различных преобразований на РДУ (
map
,flatMap
,filter
и другие), код преобразования (закрытие) является:Конечно, вы можете запустить это локально (как в вашем примере), но все эти фазы (кроме доставки по сети) все же происходят. [Это позволяет выявлять любые ошибки еще до развертывания в производство]
Во втором случае происходит то, что вы вызываете метод, определенный в классе,
testing
внутри функции карты. Spark видит это, и поскольку методы не могут быть сериализованы сами по себе, Spark пытается сериализовать весьtesting
класс, так что код все равно будет работать при выполнении в другой JVM. У вас есть две возможности:Либо вы делаете тестирование классов сериализуемым, так что весь класс может быть сериализован Spark:
или вы создаете
someFunc
функцию вместо метода (функции - это объекты в Scala), так что Spark сможет ее сериализовать:Подобная, но не та же проблема с сериализацией классов может быть вам интересна, и вы можете прочитать об этом в этой презентации Spark Summit 2013 .
В качестве примечания, вы можете переписать
rddList.map(someFunc(_))
наrddList.map(someFunc)
, они точно так же. Обычно второй вариант предпочтительнее, так как он менее подробный и понятный для чтения.РЕДАКТИРОВАТЬ (2015-03-15): SPARK-5307 представил SerializationDebugger и Spark 1.3.0 является первой версией, которая его использует. Он добавляет путь сериализации в NotSerializableException . Когда встречается исключение NotSerializableException, отладчик посещает граф объектов, чтобы найти путь к объекту, который не может быть сериализован, и создает информацию, чтобы помочь пользователю найти объект.
В случае OP это то, что выводится на стандартный вывод:
источник
val test = new Test with Serializable
Великий ответ Греги объясняет, почему оригинальный код не работает, и два способа решения проблемы. Однако это решение не очень гибкое; рассмотрим случай, когда ваше закрытие включает вызов метода для некласса,
Serializable
который вы не можете контролировать. Вы не можете ни добавитьSerializable
тег к этому классу, ни изменить базовую реализацию, чтобы изменить метод в функцию.Nilesh предлагает для этого отличный обходной путь, но решение может быть сделано как более кратким, так и общим:
Эта функция-сериализатор может затем использоваться для автоматической упаковки замыканий и вызовов методов:
Эта методика также имеет то преимущество, что не требует дополнительных зависимостей Shark для доступа
KryoSerializationWrapper
, поскольку Chill в Twitter уже задействован ядром Spark.источник
Полный доклад, полностью объясняющий проблему, который предлагает отличный способ смены парадигмы, чтобы избежать этих проблем сериализации: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
Ответ, получивший наибольшее количество голосов, в основном предлагает отказаться от целой языковой функции, которая больше не использует методы, а использует только функции. Действительно, в функциональном программировании следует избегать методов в классах, но превращение их в функции не решает проблему проектирования здесь (см. Ссылку выше).
В качестве быстрого решения в этой конкретной ситуации вы можете просто использовать
@transient
аннотацию, чтобы запретить сериализацию ошибочного значения (здесьSpark.ctx
это пользовательский класс, а не Spark, который следует из имен OP):Вы также можете реструктурировать код так, чтобы rddList жил где-то еще, но это также неприятно.
Будущее, вероятно, споры
В будущем Scala будет включать в себя такие вещи, как «споры», которые должны позволить нам точно контролировать зерно, что точно и не затягивается закрытием. Кроме того, это должно превратить все ошибки случайного извлечения несериализуемых типов (или любых нежелательных значений) в ошибки компиляции, а не сейчас, что является ужасными исключениями времени выполнения / утечками памяти.
http://docs.scala-lang.org/sips/pending/spores.html
Совет по сериализации Kryo
При использовании kyro сделайте так, чтобы регистрация была необходима, это будет означать, что вы получаете ошибки вместо утечек памяти:
«Наконец, я знаю, что у kryo есть kryo.setRegistrationOptional (true), но мне очень трудно пытаться выяснить, как его использовать. Когда эта опция включена, кажется, что kryo по-прежнему генерирует исключения, если я не зарегистрировался классы «.
Стратегия регистрации классов с крио
Конечно, это только дает вам контроль уровня типа, а не контроль уровня значения.
... больше идей
источник
Я решил эту проблему, используя другой подход. Вам просто нужно сериализовать объекты перед прохождением через замыкание, а затем десериализовать. Этот подход просто работает, даже если ваши классы не Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это немного карри. ;)
Вот пример того, как я это сделал:
Не стесняйтесь делать Blah настолько сложным, насколько вам нужно, класс, объект-компаньон, вложенные классы, ссылки на несколько сторонних библиотек.
KryoSerializationWrapper ссылается на: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
источник
KryoSerializationWrapper
вы обнаружите, что это заставляет Spark думать, что это действительно такjava.io.Serializable
- он просто сериализует объект внутренне, используя Kryo - быстрее, проще. И я не думаю, что он имеет дело со статическим экземпляром - он просто десериализует значение при вызове value.apply ().Я столкнулся с аналогичной проблемой, и то , что я понял из ответа Грега в это
Ваш метод doIT пытается сериализовать метод someFunc (_) , но так как метод не сериализуем, он пытается сериализовать тестирование класса, которое снова не сериализуемо.
Поэтому, чтобы ваш код работал, вы должны определить someFunc внутри метода doIT . Например:
И если в картину входит несколько функций, то все эти функции должны быть доступны родительскому контексту.
источник
Я не совсем уверен, что это относится к Scala, но в Java я решил проблему
NotSerializableException
путем рефакторинга своего кода, чтобы замыкание не обращалось к несериализуемомуfinal
полю.источник
FileWriter
этоfinal
поле внешнего класса, вы не можете это сделать. НоFileWriter
может быть построен изString
или илиFile
, оба из которых являютсяSerializable
. Поэтому рефакторинг вашего кода для создания локальногоFileWriter
на основе имени файла из внешнего класса.К вашему сведению, в Spark 2.4 многие из вас, вероятно, столкнутся с этой проблемой. Сериализация Kryo стала лучше, но во многих случаях вы не можете использовать spark.kryo.unsafe = true или наивный сериализатор kryo.
Для быстрого исправления попробуйте изменить следующее в конфигурации Spark
ИЛИ
Я изменяю пользовательские преобразования RDD, с которыми я сталкиваюсь или пишу лично, используя явные переменные широковещания и используя новый встроенный API twitter-chill, преобразовывая их
rdd.map(row =>
вrdd.mapPartitions(partition => {
функции.пример
Старый (не-великий) путь
Альтернативный (лучший) способ
Этот новый способ будет вызывать широковещательную переменную только один раз на раздел, что лучше. Вам все еще нужно будет использовать сериализацию Java, если вы не регистрируете классы.
источник