Apache Spark: карта против mapPartitions?

133

В чем разница между RDD map и mapPartitionsметодом? И ведет flatMapсебя как mapили нравитсяmapPartitions ? Спасибо.

(править) то есть в чем разница (семантически или с точки зрения исполнения) между

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

И:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Николас Уайт
источник
3
Прочитав ответ ниже, вы можете взглянуть на [этот опыт], которым поделился тот, кто его действительно использовал. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 /…
Abhidemon

Ответы:

121

В чем разница между картой RDD и методом mapPartitions?

Метод карта преобразует каждый элемент исходного RDD в единый элемент результата RDD путем применения функции. mapPartitions преобразует каждый раздел исходного RDD в несколько элементов результата (возможно, ни одного).

И flatMap ведет себя как map или как mapPartitions?

Также flatMap работает с одним элементом (as map) и производит несколько элементов результата (as mapPartitions).

Алексей романов
источник
3
Спасибо - так карта вызывает перемешивание (или как-то иначе меняет количество разделов)? Перемещает ли он данные между узлами? Я использовал mapPartitions, чтобы избежать перемещения данных между узлами, но не был уверен, что flapMap будет делать это.
Николас Уайт
Если вы посмотрите на исходный код - github.com/apache/incubator-spark/blob/... и github.com/apache/incubator-spark/blob/... - как mapи flatMapимеют точно такие же разделы, что и родитель.
Алексей Романов
13
В качестве примечания, в презентации, представленной докладчиком на Саммите Spark в Сан-Франциско в 2013 году (goo.gl/JZXDCR), подчеркивается, что задачи с высокими накладными расходами на запись лучше выполняются с mapPartition, чем с преобразованием карты. Согласно презентации, это связано с дороговизной создания новой задачи.
Mikel Urkia
1
Я вижу обратное - даже с очень небольшими операциями быстрее вызывать mapPartitions и выполнять итерацию, чем вызывать map. Я предполагаю, что это просто накладные расходы на запуск языкового движка, который будет обрабатывать задачу карты. (Я использую R, что может иметь больше накладных расходов на запуск.) Если вы выполняете несколько операций, то mapPartitions кажется немного быстрее - я предполагаю, что это потому, что он считывает RDD только один раз. Даже если RDD кэшируется в ОЗУ, это значительно сокращает накладные расходы на преобразование типов.
Боб,
3
mapв основном берет вашу функцию fи передает ее в iter.map(f). Так что в основном это удобный метод, который обертывает mapPartitions. Я был бы удивлен, если бы было преимущество в производительности в любом случае для задания преобразования чистого стиля карты (то есть, когда функция идентична), если вам нужно создать некоторые объекты для обработки, если эти объекты могут использоваться совместно, тогда mapPartitionsбыло бы выгодно.
NightWolf
129

Настоятельный НАКОНЕЧНИК :

Всякий раз, когда у вас есть тяжелая инициализация, которая должна выполняться один раз для многих RDDэлементов, а не один раз для каждого RDDэлемента, и если эта инициализация, такая как создание объектов из сторонней библиотеки, не может быть сериализована (чтобы Spark мог передать ее через кластер в рабочие узлы), используйте mapPartitions()вместо map(). mapPartitions()предусматривает, что инициализация выполняется один раз для каждой рабочей задачи / потока / раздела, а не один раз для каждого RDDэлемента данных, например: см. ниже.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. ведет flatMapсебя как карта или как mapPartitions?

Да. см. пример 2 flatmap.. это не требует пояснений.

Q1. В чем разница между RDD mapиmapPartitions

mapвыполняет функцию, используемую на уровне каждого элемента, в то время как mapPartitionsвыполняет функцию на уровне раздела.

Пример сценария : если у нас есть 100 КБ элементов в конкретномRDDразделе, тогда мы 100 КБ запускаем функцию, используемую преобразованием отображения, когда используемmap.

И наоборот, если мы используем, mapPartitionsто мы вызовем конкретную функцию только один раз, но мы передадим все 100K записей и вернем все ответы за один вызов функции.

Произойдет прирост производительности, поскольку mapмного раз работает с определенной функцией, особенно если функция каждый раз делает что-то дорогостоящее, чего не нужно было бы делать, если бы мы передали все элементы сразу (в случае mappartitions).

карта

Применяет функцию преобразования к каждому элементу RDD и возвращает результат как новый RDD.

Варианты листинга

def map [U: ClassTag] (f: T => U): RDD [U]

Пример :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательного потока значений через входной аргумент (Iterarator [T]). Пользовательская функция должна возвращать еще один Iterator [U]. Объединенные итераторы результатов автоматически преобразуются в новый СДР. Обратите внимание, что кортежи (3,4) и (6,7) отсутствуют в следующем результате из-за выбранного нами разбиения.

preservesPartitioningуказывает, сохраняет ли функция ввода разделитель, что должно быть, falseесли только это не RDD пары и функция ввода не изменяет ключи.

Варианты листинга

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], pre sizesPartitioning: Boolean = false): RDD [U]

Пример 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Пример 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Вышеупомянутая программа также может быть написана с использованием flatMap следующим образом.

Пример 2 с использованием плоской карты

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Вывод :

mapPartitionsпреобразование происходит быстрее, чем mapпоскольку оно вызывает вашу функцию один раз / раздел, а не один раз / элемент ..

Дополнительная литература: foreach против foreachPartitions Когда использовать Что?

Рам Гадиярам
источник
4
Я знаю, что вы можете использовать mapили mapPartitionsдля достижения того же результата (см. Два примера в вопросе); этот вопрос о том, почему вы предпочли бы один путь другому. Комментарии в другом ответе действительно полезны! Кроме того , вы не упомянули , что mapи flatMapпередать falseк preservesPartitioning, и каковы последствия, которые.
Николас Уайт
2
функция, выполняемая каждый раз, по сравнению с функцией, выполняемой один раз, для разделения - это ссылка, которую я пропустил. Имея доступ к нескольким записям данных одновременно с mapPartition - бесценная вещь. ценю ответ
точка с запятой и клейкая лента
1
Есть сценарий, где mapлучше чем mapPartitions? Если mapPartitionsэто так хорошо, почему это не реализация карты по умолчанию?
ruhong
1
@oneleggedmule: оба предназначены для разных требований, которые мы должны использовать с умом, если вы создаете экземпляры ресурсов, таких как подключения к базе данных (как показано в приведенном выше примере), которые являются дорогостоящими, тогда сопоставления - правильный подход, поскольку одно подключение на раздел. также saveAsTextFile, используемые для внутреннего использования сопоставления см.
Ram Ghadiyaram
@oneleggedmule С моей точки зрения, map () легче понять и изучить, и это также общий метод для многих разных языков. Это может быть проще в использовании, чем mapPartitions (), если кто-то не знаком с этим специфическим методом Spark вначале. Если нет разницы в производительности, я предпочитаю использовать map ().
Raymond Chen
15

Карта :

  1. Он обрабатывает одну строку за раз, очень похоже на метод map () в MapReduce.
  2. Вы возвращаетесь из трансформации после каждой строки.

MapPartitions

  1. Он обрабатывает весь раздел за один раз.
  2. Вы можете вернуться из функции только один раз после обработки всего раздела.
  3. Все промежуточные результаты необходимо хранить в памяти, пока вы не обработаете весь раздел.
  4. Предоставляет вам как функции setup () map () и cleanup () MapReduce.

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
источник
относительно 2 - если вы выполняете преобразования итератора в итератор, а не материализуете итератор в какую-либо коллекцию, вам не нужно будет хранить весь раздел в памяти, фактически, таким образом искра сможет выливать части раздела на диск.
ilcord
4
Вам не нужно держать в памяти весь раздел, а результат. Вы не можете вернуть результат, пока не обработаете весь раздел
KrazyGautam