Я исследую поведение Спарка, когда присоединяю стол к себе. Я использую Databricks.
Мой глупый сценарий:
Прочитать внешнюю таблицу как фрейм данных A (лежащие в основе файлы в дельта-формате)
Определите фрейм данных B как фрейм данных A с выбранными только определенными столбцами
Соедините кадры данных A и B в столбце 1 и столбце 2
(Да, это не имеет особого смысла, я просто экспериментирую, чтобы понять основную механику Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Моей первой попыткой было запустить код как есть (попытка 1). Затем я попытался перераспределить и кэшировать (попытка 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Наконец, я перераспределил, отсортировал и кэшировал
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Соответствующие сгенерированные рисунки прилагаются.
Мои вопросы:
Почему при попытке 1 таблица выглядит кэшированной, хотя она не была явно указана.
Почему за InMemoreTableScan всегда следует другой узел этого типа.
Почему при попытке 3 кэширование происходит в два этапа?
Почему при попытке 3 WholeStageCodegen следует одному (и только одному) InMemoreTableScan.
Ответы:
То, что вы наблюдаете в этих трех планах, представляет собой смесь среды выполнения DataBricks и Spark.
Прежде всего, при запуске DataBricks 3.3+ кэширование автоматически включается для всех файлов паркета. Соответствующий конфиг для этого:
spark.databricks.io.cache.enabled true
Для вашего второго запроса InMemoryTableScan выполняется дважды, потому что именно тогда, когда вызывалось соединение, spark пытался вычислить набор данных A и набор данных B параллельно. Предполагая, что разные исполнители получили назначенные выше задачи, оба должны будут сканировать таблицу из (DataBricks) кеша.
Для третьего InMemoryTableScan не относится к самому кешированию. Это просто означает, что независимо от того, какой катализатор плана был сформирован, он несколько раз сканировал кэшированную таблицу.
PS: я не могу представить себе пункт 4 :)
источник