У меня есть датафрейм со следующим кодом:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Теперь, проверяя журналы, я обнаружил, что для каждой строки UDF выполняется 3 раза. Если я добавлю «test3» из столбца «test.three», UDF будет выполнен еще раз.
Может кто-нибудь объяснить мне, почему?
Можно ли этого избежать должным образом (без кэширования данных после добавления «теста», даже если это работает)?
scala
apache-spark
apache-spark-sql
Rolintocour
источник
источник
Map
не Struct. Теперь вместо возврата Map, если UDF возвращает класс case, такой как Test (одна String, две: String), тогдаtest
это действительно Struct, но всегда есть столько исполнений UDF.Ответы:
Если вы хотите избежать нескольких обращений к udf (что полезно, особенно если udf является узким местом в вашей работе), вы можете сделать это следующим образом:
По сути, вы говорите Spark, что ваша функция не является детерминированной, и теперь Spark гарантирует, что она вызывается только один раз, потому что небезопасно вызывать ее несколько раз (каждый вызов может возвращать разные результаты).
Также имейте в виду, что этот трюк не бесплатен, выполняя это, вы накладываете некоторые ограничения на оптимизатор, одним из побочных эффектов которого является, например, то, что оптимизатор Spark не проталкивает фильтры через выражения, которые не являются детерминированными, поэтому вы становитесь ответственными за оптимальные положение фильтров в вашем запросе.
источник
asNondeterministic
силы UDF выполняются только один раз. Сexplode(array(myUdf($"id")))
решением, это все еще выполняется дважды.