(Почему) нам нужно вызвать кэш или сохранить на RDD

171

Когда эластичный распределенный набор данных (RDD) создается из текстового файла или коллекции (или из другого RDD), нужно ли явно вызывать «cache» или «persist» для сохранения данных RDD в памяти? Или данные СДР по умолчанию хранятся в памяти распределенным способом?

val textFile = sc.textFile("/user/emp.txt")

Насколько я понимаю, после вышеуказанного шага textFile является СДР и доступен во всей / части памяти узла.

Если так, то зачем нам тогда вызывать «кеш» или «сохранять» в textFile RDD?

Рамана
источник

Ответы:

300

Большинство операций RDD ленивы. Думайте о СДР как о описании ряда операций. СДР не является данными. Итак, эта строка:

val textFile = sc.textFile("/user/emp.txt")

Это ничего не делает. Он создает RDD с надписью «нам нужно загрузить этот файл». Файл не загружен в этот момент.

Операции RDD, которые требуют наблюдения содержимого данных, не могут быть ленивыми. (Это называется действиями .) Например, RDD.countчтобы сообщить вам количество строк в файле, файл необходимо прочитать. Поэтому, если вы напишите textFile.count, в этот момент файл будет прочитан, строки будут подсчитаны, и счет будет возвращен.

Что если вы позвоните textFile.countснова? То же самое: файл будет прочитан и снова подсчитан. Ничего не хранится. СДР не является данными.

Так что же RDD.cacheделать? Если вы добавите textFile.cacheк приведенному выше коду:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

Это ничего не делает. RDD.cacheтоже ленивая операция. Файл до сих пор не прочитан. Но теперь СДР говорит «прочитай этот файл и затем кешируй содержимое». Если вы запустите textFile.countпервый раз, файл будет загружен, кэширован и подсчитан. Если вы позвоните textFile.countво второй раз, операция будет использовать кэш. Он просто возьмет данные из кеша и посчитает строки.

Поведение кэша зависит от доступной памяти. Если файл не помещается в памяти, например, тогда textFile.countоткатится к обычному поведению и перечитает файл.

Даниэль Дарабос
источник
4
Привет, Даниил, - когда вы вызываете кеш, означает ли это, что СДР не перезагружается из источника (например, текстового файла) - как вы можете быть уверены, что данные из текстового файла самые последние, когда они кэшируются? (выясняет ли это спарк, или это ручная операция для периодического unpersist (), чтобы гарантировать, что исходные данные будут пересчитаны позже в родословной?)
andrew.butkus
также - если вы должны периодически отменять действие, - если у вас есть кэшированный rdd, зависящий от другого кэшируемого RDD, следует ли отключать оба RDD, чтобы увидеть пересчитанные результаты?
andrew.butkus
21
Spark предполагает, что файл никогда не изменится. Он читает файл в произвольный момент времени и может перечитать его части по мере необходимости позже. (Например, если часть данных была извлечена из кэша.) Так что вам лучше сохранить ваши файлы неизменными! Просто создайте новый файл с новым именем, когда у вас появятся новые данные, а затем загрузите его как новый RDD. Если вы постоянно получаете новые данные, изучите Spark Streaming.
Даниэль Дарабос
10
Да. СДР являются неизменяемыми, поэтому каждый СДР предполагает, что его зависимости также неизменны. Spark Streaming позволяет настроить такие деревья, которые работают с потоком изменений. Но еще более простым решением является построение дерева в функции, которая принимает имя файла в качестве параметра. Затем просто вызовите функцию для нового файла и poof, вы получите новое дерево вычислений.
Даниэль Дарабос
1
@Humoyun: На вкладке «Хранилище» в Spark UI вы можете видеть, сколько кэшированных файлов хранится в каждом из них. Данные могут быть настолько большими, что только 40% их умещается в общей памяти, которую вы имеете для кэширования. Одним из вариантов в этом случае является использование perisistи выбор опции хранения, которая позволяет выливать данные кэша на диск.
Даниэль Дарабос
197

Я думаю, что вопрос будет лучше сформулирован так:

Когда нам нужно вызвать кэш или сохранить на RDD?

Спарк процессы ленивы, то есть ничего не произойдет, пока не потребуется. Чтобы быстро ответить на вопрос, после val textFile = sc.textFile("/user/emp.txt")выдачи данных ничего не происходит, создается только a HadoopRDD, используя файл в качестве источника.

Допустим, мы немного изменим эти данные:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Опять же, ничего не происходит с данными. Теперь есть новый RDD, wordsRDDкоторый содержит ссылку testFileи функцию, которая будет применяться при необходимости.

Только когда действие вызывается над RDD, например wordsRDD.countцепочкой RDD, называемой lineage, будет выполнено. То есть данные, разбитые по разделам, будут загружены исполнителями кластера Spark, flatMapбудет применена функция и будет вычислен результат.

На линейной линии, как та, что в этом примере, cache()не требуется. Данные будут загружены исполнителям, все преобразования будут применены и, наконец, countбудут вычислены все данные в памяти - если данные помещаются в память.

cacheполезно, когда линия RDD разветвляется. Допустим, вы хотите отфильтровать слова из предыдущего примера в счетчик положительных и отрицательных слов. Вы можете сделать это так:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Здесь каждая ветвь выдает перезагрузку данных. Добавление явного cacheоператора гарантирует, что обработка, выполненная ранее, будет сохранена и повторно использована. Работа будет выглядеть так:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

По этой причине cacheговорят, что он «нарушает родословную», поскольку создает контрольную точку, которую можно повторно использовать для дальнейшей обработки.

Основное правило: Используйте, cacheкогда линия вашего RDD разветвляется или когда RDD используется несколько раз, как в цикле.

maasg
источник
1
Потрясающие. Спасибо. Еще один связанный вопрос. Когда мы кешируем или сохраняем, данные будут храниться в памяти исполнителя или в памяти рабочего узла. Если это память исполнителя, как Spark определяет, у какого исполнителя есть данные.
Рамана
1
@RamanaUppala используется память исполнителя. Часть памяти исполнителя, используемая для кэширования, контролируется конфигурацией spark.storage.memoryFraction. Что касается того, какой исполнитель имеет какие данные, СДР будет отслеживать свои разделы, которые распределены по исполнителям.
Maasg
5
@maasg Поправь меня, если я ошибаюсь, но не могу и cacheне persistмогу нарушить происхождение .
ноль 323
Где будут храниться wordsRDD, если бы в приведенном выше примере не было оператора .cache ()?
sun_dare
Что если до того, как эти два счета будут подсчитаны, мы объединяем две ветви обратно в один rdd и считаем? полезен ли кеш в этом случае?
Xiawei Zhang
30

Нужно ли явно вызывать «cache» или «persist» для сохранения данных RDD в памяти?

Да, только если это необходимо.

Данные СДР хранятся распределенным способом в памяти по умолчанию?

Нет!

И вот причины, почему:

  • Spark поддерживает два типа общих переменных: широковещательные переменные, которые можно использовать для кэширования значения в памяти на всех узлах, и аккумуляторы, которые являются переменными, которые только «добавляются», такие как счетчики и суммы.

  • СДР поддерживают два типа операций: преобразования, которые создают новый набор данных из существующего, и действия, которые возвращают значение программе драйвера после выполнения вычисления в наборе данных. Например, map - это преобразование, которое передает каждый элемент набора данных через функцию и возвращает новый RDD, представляющий результаты. С другой стороны, Reduce - это действие, которое объединяет все элементы RDD с использованием некоторой функции и возвращает конечный результат в программу драйвера (хотя есть также параллельный reduByKey, который возвращает распределенный набор данных).

  • Все преобразования в Spark ленивы, потому что они не вычисляют свои результаты сразу. Вместо этого они просто запоминают преобразования, примененные к некоторому базовому набору данных (например, к файлу). Преобразования вычисляются только тогда, когда действие требует возврата результата в программу драйвера. Этот дизайн позволяет Spark работать более эффективно - например, мы можем понять, что набор данных, созданный с помощью карты, будет использоваться для сокращения и возвращать только результат сокращения драйверу, а не больший набор отображаемых данных.

  • По умолчанию каждый преобразованный RDD может пересчитываться каждый раз, когда вы выполняете над ним действие. Однако вы также можете сохранить RDD в памяти, используя метод persist (или кэш), и в этом случае Spark будет хранить элементы в кластере для гораздо более быстрого доступа при следующем запросе. Существует также поддержка сохранения RDD на диске или репликации на нескольких узлах.

Для более подробной информации, пожалуйста, обратитесь к руководству по программированию Spark .

eliasah
источник
1
Это не ответило на мой вопрос.
Рамана
Что не отвечает на это?
Ильи
1
когда данные RDD хранятся в памяти по умолчанию, зачем нам вызывать Cache или Persist?
Рамана
По умолчанию RDD не хранятся в памяти, поэтому сохранение RDD позволяет Spark быстрее выполнять преобразование в кластере
eliasah
2
Это хороший ответ, я не знаю, почему за него проголосовали. Это нисходящий ответ, объясняющий, как RDD работают на основе концепций высокого уровня. Я добавил еще один ответ, идущий снизу вверх: начиная с «что делает эта строка». Может быть, легче следовать за кем-то, только начинающим со Spark.
Даниэль Дарабос
11

Ниже приведены три ситуации, которые вы должны кэшировать ваши RDD:

используя RDD много раз

выполнение нескольких действий на одном и том же СДР

для длинных цепочек (или очень дорогих) преобразований

rileyss
источник
7

Добавление еще одной причины для добавления (или временного добавления) cacheвызова метода.

для отладки проблем с памятью

При использовании cacheметода spark выдаст отладочную информацию о размере RDD. так что в интегрированном интерфейсе spark вы получите информацию об использовании памяти RDD. и это оказалось очень полезным для диагностики проблем с памятью.

Кованые изделия
источник