Spark: UDF исполняется много раз

9

У меня есть датафрейм со следующим кодом:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Теперь, проверяя журналы, я обнаружил, что для каждой строки UDF выполняется 3 раза. Если я добавлю «test3» из столбца «test.three», UDF будет выполнен еще раз.

Может кто-нибудь объяснить мне, почему?

Можно ли этого избежать должным образом (без кэширования данных после добавления «теста», даже если это работает)?

Rolintocour
источник
Что вы имеете в виду? Вы вызываете тестовую функцию три раза. Вот почему это выполняется три раза. Не уверен, почему ты делаешь это UDF. Почему бы просто не сделать карту вали?
user4601931
Это всего лишь пример, демонстрирующий поведение искры. Для меня «тест» - это новый столбец, который содержит структуру, поэтому доступ к любой части структуры не должен снова выполнять UDF. Как я не прав?
Rolintocour
Я попытался напечатать схему, тип данных "test" - это Mapне Struct. Теперь вместо возврата Map, если UDF возвращает класс case, такой как Test (одна String, две: String), тогда testэто действительно Struct, но всегда есть столько исполнений UDF.
Rolintocour
связанные: stackoverflow.com/questions/40320563/…
Рафаэль Рот
кеширование должно работать согласно этому ответу: stackoverflow.com/a/40962714/1138523
Рафаэль Рот

Ответы:

5

Если вы хотите избежать нескольких обращений к udf (что полезно, особенно если udf является узким местом в вашей работе), вы можете сделать это следующим образом:

val testUDF = udf(test _).asNondeterministic()

По сути, вы говорите Spark, что ваша функция не является детерминированной, и теперь Spark гарантирует, что она вызывается только один раз, потому что небезопасно вызывать ее несколько раз (каждый вызов может возвращать разные результаты).

Также имейте в виду, что этот трюк не бесплатен, выполняя это, вы накладываете некоторые ограничения на оптимизатор, одним из побочных эффектов которого является, например, то, что оптимизатор Spark не проталкивает фильтры через выражения, которые не являются детерминированными, поэтому вы становитесь ответственными за оптимальные положение фильтров в вашем запросе.

Дэвид Врба
источник
отлично! этот ответ также принадлежит здесь: stackoverflow.com/questions/40320563/…
Рафаэль Рот
В моем случае asNondeterministicсилы UDF выполняются только один раз. С explode(array(myUdf($"id")))решением, это все еще выполняется дважды.
Rolintocour