Apache Spark: влияние перераспределения, сортировки и кэширования на соединение

10

Я исследую поведение Спарка, когда присоединяю стол к себе. Я использую Databricks.

Мой глупый сценарий:

  1. Прочитать внешнюю таблицу как фрейм данных A (лежащие в основе файлы в дельта-формате)

  2. Определите фрейм данных B как фрейм данных A с выбранными только определенными столбцами

  3. Соедините кадры данных A и B в столбце 1 и столбце 2

(Да, это не имеет особого смысла, я просто экспериментирую, чтобы понять основную механику Spark)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Моей первой попыткой было запустить код как есть (попытка 1). Затем я попытался перераспределить и кэшировать (попытка 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Наконец, я перераспределил, отсортировал и кэшировал

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Соответствующие сгенерированные рисунки прилагаются.

Мои вопросы:

  1. Почему при попытке 1 таблица выглядит кэшированной, хотя она не была явно указана.

  2. Почему за InMemoreTableScan всегда следует другой узел этого типа.

  3. Почему при попытке 3 кэширование происходит в два этапа?

  4. Почему при попытке 3 WholeStageCodegen следует одному (и только одному) InMemoreTableScan.

попытка 1

попытка 2

введите описание изображения здесь

Dawid
источник
Я подозреваю, что DataFrame Reader автоматически кэширует данные, когда источником является внешняя таблица. У меня есть похожая ситуация, когда я читаю данные из таблицы базы данных, в то время как в состоянии загружается, вкладка «SQL» в «Детализированном интерфейсе приложения» показывает мне количество загружаемых строк, но ни один файл еще не был сохранен в указанном месте , Я предполагаю, что он знает счетчик, потому что он где-то кэширует данные, и это то, что появляется на DAG. Если вы читаете данные из текстового файла локально, вы не увидите состояние кэша.
Салим

Ответы:

4

То, что вы наблюдаете в этих трех планах, представляет собой смесь среды выполнения DataBricks и Spark.

Прежде всего, при запуске DataBricks 3.3+ кэширование автоматически включается для всех файлов паркета. Соответствующий конфиг для этого: spark.databricks.io.cache.enabled true

Для вашего второго запроса InMemoryTableScan выполняется дважды, потому что именно тогда, когда вызывалось соединение, spark пытался вычислить набор данных A и набор данных B параллельно. Предполагая, что разные исполнители получили назначенные выше задачи, оба должны будут сканировать таблицу из (DataBricks) кеша.

Для третьего InMemoryTableScan не относится к самому кешированию. Это просто означает, что независимо от того, какой катализатор плана был сформирован, он несколько раз сканировал кэшированную таблицу.

PS: я не могу представить себе пункт 4 :)

Ашвжит Сингх
источник