Большинство операций RDD ленивы. Думайте о СДР как о описании ряда операций. СДР не является данными. Итак, эта строка:
val textFile = sc.textFile("/user/emp.txt")
Это ничего не делает. Он создает RDD с надписью «нам нужно загрузить этот файл». Файл не загружен в этот момент.
Операции RDD, которые требуют наблюдения содержимого данных, не могут быть ленивыми. (Это называется действиями .) Например, RDD.count
чтобы сообщить вам количество строк в файле, файл необходимо прочитать. Поэтому, если вы напишите textFile.count
, в этот момент файл будет прочитан, строки будут подсчитаны, и счет будет возвращен.
Что если вы позвоните textFile.count
снова? То же самое: файл будет прочитан и снова подсчитан. Ничего не хранится. СДР не является данными.
Так что же RDD.cache
делать? Если вы добавите textFile.cache
к приведенному выше коду:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Это ничего не делает. RDD.cache
тоже ленивая операция. Файл до сих пор не прочитан. Но теперь СДР говорит «прочитай этот файл и затем кешируй содержимое». Если вы запустите textFile.count
первый раз, файл будет загружен, кэширован и подсчитан. Если вы позвоните textFile.count
во второй раз, операция будет использовать кэш. Он просто возьмет данные из кеша и посчитает строки.
Поведение кэша зависит от доступной памяти. Если файл не помещается в памяти, например, тогда textFile.count
откатится к обычному поведению и перечитает файл.
perisist
и выбор опции хранения, которая позволяет выливать данные кэша на диск.Я думаю, что вопрос будет лучше сформулирован так:
Когда нам нужно вызвать кэш или сохранить на RDD?
Спарк процессы ленивы, то есть ничего не произойдет, пока не потребуется. Чтобы быстро ответить на вопрос, после
val textFile = sc.textFile("/user/emp.txt")
выдачи данных ничего не происходит, создается только aHadoopRDD
, используя файл в качестве источника.Допустим, мы немного изменим эти данные:
Опять же, ничего не происходит с данными. Теперь есть новый RDD,
wordsRDD
который содержит ссылкуtestFile
и функцию, которая будет применяться при необходимости.Только когда действие вызывается над RDD, например
wordsRDD.count
цепочкой RDD, называемой lineage, будет выполнено. То есть данные, разбитые по разделам, будут загружены исполнителями кластера Spark,flatMap
будет применена функция и будет вычислен результат.На линейной линии, как та, что в этом примере,
cache()
не требуется. Данные будут загружены исполнителям, все преобразования будут применены и, наконец,count
будут вычислены все данные в памяти - если данные помещаются в память.cache
полезно, когда линия RDD разветвляется. Допустим, вы хотите отфильтровать слова из предыдущего примера в счетчик положительных и отрицательных слов. Вы можете сделать это так:Здесь каждая ветвь выдает перезагрузку данных. Добавление явного
cache
оператора гарантирует, что обработка, выполненная ранее, будет сохранена и повторно использована. Работа будет выглядеть так:По этой причине
cache
говорят, что он «нарушает родословную», поскольку создает контрольную точку, которую можно повторно использовать для дальнейшей обработки.Основное правило: Используйте,
cache
когда линия вашего RDD разветвляется или когда RDD используется несколько раз, как в цикле.источник
spark.storage.memoryFraction
. Что касается того, какой исполнитель имеет какие данные, СДР будет отслеживать свои разделы, которые распределены по исполнителям.cache
неpersist
могу нарушить происхождение .Нужно ли явно вызывать «cache» или «persist» для сохранения данных RDD в памяти?
Да, только если это необходимо.
Данные СДР хранятся распределенным способом в памяти по умолчанию?
Нет!
И вот причины, почему:
Spark поддерживает два типа общих переменных: широковещательные переменные, которые можно использовать для кэширования значения в памяти на всех узлах, и аккумуляторы, которые являются переменными, которые только «добавляются», такие как счетчики и суммы.
СДР поддерживают два типа операций: преобразования, которые создают новый набор данных из существующего, и действия, которые возвращают значение программе драйвера после выполнения вычисления в наборе данных. Например, map - это преобразование, которое передает каждый элемент набора данных через функцию и возвращает новый RDD, представляющий результаты. С другой стороны, Reduce - это действие, которое объединяет все элементы RDD с использованием некоторой функции и возвращает конечный результат в программу драйвера (хотя есть также параллельный reduByKey, который возвращает распределенный набор данных).
Все преобразования в Spark ленивы, потому что они не вычисляют свои результаты сразу. Вместо этого они просто запоминают преобразования, примененные к некоторому базовому набору данных (например, к файлу). Преобразования вычисляются только тогда, когда действие требует возврата результата в программу драйвера. Этот дизайн позволяет Spark работать более эффективно - например, мы можем понять, что набор данных, созданный с помощью карты, будет использоваться для сокращения и возвращать только результат сокращения драйверу, а не больший набор отображаемых данных.
По умолчанию каждый преобразованный RDD может пересчитываться каждый раз, когда вы выполняете над ним действие. Однако вы также можете сохранить RDD в памяти, используя метод persist (или кэш), и в этом случае Spark будет хранить элементы в кластере для гораздо более быстрого доступа при следующем запросе. Существует также поддержка сохранения RDD на диске или репликации на нескольких узлах.
Для более подробной информации, пожалуйста, обратитесь к руководству по программированию Spark .
источник
Ниже приведены три ситуации, которые вы должны кэшировать ваши RDD:
источник
Добавление еще одной причины для добавления (или временного добавления)
cache
вызова метода.для отладки проблем с памятью
При использовании
cache
метода spark выдаст отладочную информацию о размере RDD. так что в интегрированном интерфейсе spark вы получите информацию об использовании памяти RDD. и это оказалось очень полезным для диагностики проблем с памятью.источник