Разница между reduce и foldLeft / fold в функциональном программировании (особенно в Scala и Scala API)?

Ответы:

260

уменьшить vs foldLeft

Большое большое различие, не упомянутое ни в одном другом ответе на переполнение стека, относящемся к этой теме, заключается в том, что reduceдолжен быть дан коммутативный моноид , то есть операция, которая является как коммутативной, так и ассоциативной. Это означает, что операцию можно распараллелить.

Это различие очень важно для больших данных / MPP / распределенных вычислений, и является единственной причиной, по которой оно reduceвообще существует. Коллекция может быть разделена на части, и они reduceмогут работать с каждым фрагментом, а затем reduceмогут работать с результатами каждого фрагмента - на самом деле уровень фрагментации не должен останавливаться на один уровень. Мы тоже можем нарезать каждый кусок. Вот почему суммирование целых чисел в списке составляет O (log N), если дано бесконечное количество процессоров.

Если вы просто посмотрите на подписи, у них нет причин для reduceсуществования, потому что вы можете достичь всего, что можете, reduceс помощью foldLeft. Функциональность foldLeftбольше, чем функциональность reduce.

Но вы не можете распараллелить a foldLeft, поэтому его время выполнения всегда O (N) (даже если вы используете коммутативный моноид). Это связано с тем, что предполагается, что операция не является коммутативным моноидом, и поэтому накопленное значение будет вычислено посредством серии последовательных агрегатов.

foldLeftне предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность разбить коллекцию, и ее коммутативность, которая упрощает кумуляцию, потому что порядок не важен (поэтому не имеет значения, в каком порядке агрегировать каждый результат из каждого из фрагментов). Строго говоря, коммутативность не требуется для распараллеливания, например для алгоритмов распределенной сортировки, она просто упрощает логику, потому что вам не нужно упорядочивать свои фрагменты.

Если вы посмотрите документацию Spark, в reduceней конкретно написано «... коммутативный и ассоциативный бинарный оператор»

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Вот доказательство того, что reduceэто НЕ частный случайfoldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

уменьшить vs свернуть

Теперь это то, где он становится немного ближе к математическим корням FP /, и его немного сложнее объяснить. Reduce формально определяется как часть парадигмы MapReduce, которая имеет дело с беспорядочными коллекциями (мультимножествами), Fold формально определяется в терминах рекурсии (см. Катаморфизм) и, таким образом, предполагает структуру / последовательность коллекций.

В foldScalding нет метода, потому что в рамках (строгой) модели программирования Map Reduce мы не можем определить, foldпотому что чанки не имеют упорядочения и foldтребуют только ассоциативности, а не коммутативности.

Проще говоря, reduceработает без порядка кумуляции, foldтребует порядка кумуляции, и именно такой порядок кумуляции требует нулевого значения, а НЕ существования нулевого значения, которое их отличает. Строго говоря, он reduce должен работать с пустой коллекцией, потому что его нулевое значение можно вывести, взяв произвольное значение xи затем решив x op y = x, но это не работает с некоммутативной операцией, поскольку могут существовать левое и правое нулевые значения, которые отличаются (т.е. x op y != y op x). Конечно, Scala не пытается выяснить, что это за нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, не поддается вычислению), поэтому просто выдает исключение.

Кажется (как это часто бывает в этимологии), этот первоначальный математический смысл был утерян, поскольку единственное очевидное различие в программировании - это подпись. В результате reduceон стал синонимом foldMapReduce, а не сохранил его первоначальное значение. Теперь эти термины часто используются как взаимозаменяемые и в большинстве реализаций ведут себя одинаково (игнорируя пустые коллекции). Странность усугубляется особенностями, как в Spark, которые мы сейчас рассмотрим.

Так Спарк действительно есть fold, но порядок , в котором результаты суб ( по одному для каждого раздела) объединяются (на момент написания) тот же порядок , в котором задачи выполняются - и , таким образом , не детерминированным. Спасибо @CafeFeed за указание на foldиспользование runJob, которое после прочтения кода я понял, что он недетерминирован. Дальнейшая путаница возникает из-за того, что у Spark есть treeReduceно нет treeFold.

Вывод

Существует разница между reduceи foldдаже при применении к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce для коллекций с произвольным порядком ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ), и следует предполагать, что операторы не только коммутативны, но и являются коммутативными. ассоциативно для получения детерминированных результатов. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.

На практике из-за нематематической природы программирования reduceи, foldкак правило, ведут себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).

Дополнительно: мое мнение об API Spark

Я считаю, что путаницы можно было бы избежать, если бы полностью отказаться от этого термина foldв Spark. По крайней мере, у Spark есть примечание в их документации:

Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций в функциональных языках, таких как Scala.

Самтебест
источник
2
Вот почему в своем имени foldLeftсодержится Leftсимвол, и поэтому также существует метод fold.
kiritsuku 06
1
@Cloudtech Это совпадение однопоточной реализации, а не в рамках спецификации. На моей 4-ядерной машине, если я пытаюсь добавить .par, (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)то каждый раз получаю разные результаты.
samthebest
2
@AlexDean в контексте информатики, нет, на самом деле ему не нужна личность, поскольку пустые коллекции, как правило, просто генерируют исключения. Но математически более элегантно (и было бы более элегантно, если бы это делали коллекции), если элемент идентичности возвращается, когда коллекция пуста. В математике не существует «выбросить исключение».
samthebest
3
@samthebest: Вы уверены в коммутативности? github.com/apache/spark/blob/… говорит: «Для некоммутативных функций результат может отличаться от результата свертывания, примененного к нераспределенной коллекции».
Make42
1
@ Make42 Правильно, можно написать собственного reallyFoldсутенера, например:, rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)для поездки на работу f не понадобится.
samthebest
10

Если я не ошибаюсь, даже если Spark API этого не требует, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегированы разделы, не гарантирован. Например, в следующем коде сортируется только первая распечатка:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Распечатка:

АБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЫЭЮЯ

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

Мишель Розенталь
источник
После некоторого разговора мы считаем, что вы правы. Порядок комбинирования - первым пришел - первым подал. Если вы запустите sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)несколько раз с 2+ ядрами, я думаю, вы увидите, что он производит случайный (по разделам) порядок. Я соответствующим образом обновил свой ответ.
samthebest
3

foldв Apache Spark отличается foldот нераспространяемых коллекций. Фактически это требует коммутативной функции для получения детерминированных результатов :

Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala. Эта операция сгиба может применяться к разделам по отдельности, а затем складывать эти результаты в окончательный результат, а не применять сгиб к каждому элементу последовательно в некотором определенном порядке. Для некоммутативных функций результат может отличаться от результата свертки, примененной к нераспределенной коллекции.

Это было показано на Mishael Rosenthal и предложил Make42 в своем комментарии .

Было высказано предположение, что наблюдаемое поведение связано с тем, HashPartitionerкогда на самом деле parallelizeне перемешивается и не используется HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Разъяснил:

Структураfold для RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

такая же, как структураreduce для RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

где runJobвыполняется без учета порядка разбиения и приводит к необходимости коммутативной функции.

foldPartitionи reducePartitionэквивалентны с точки зрения порядка обработки и эффективно (посредством наследования и делегирования) реализованы самим reduceLeftи foldLeftдалее TraversableOnce.

Вывод: foldRDD не может зависеть от порядка блоков и требует коммутативности и ассоциативности .

оборота пользователь 6022341
источник
Я должен признать, что этимология сбивает с толку, а в литературе по программированию отсутствуют формальные определения. Я думаю, можно с уверенностью сказать, что foldon RDDs действительно действительно то же самое reduce, но это не учитывает коренные математические различия (я обновил свой ответ, чтобы он был еще более ясным). Хотя я не согласен с тем, что нам действительно нужна коммутативность, при условии, что каждый уверен, что бы ни делал его разделитель, он поддерживает порядок.
samthebest
Неопределенный порядок складывания не связан с разбиением. Это прямое следствие реализации runJob.
А! Извините, я не смог понять, в чем заключалась ваша точка зрения, но, прочитав runJobкод, я вижу, что на самом деле он выполняет объединение в соответствии с тем, когда задача завершена, а НЕ с порядком разделов. Именно эта ключевая деталь заставляет все встать на свои места. Я отредактировал мой ответ снова и , таким образом , исправил ошибку вы отмечаете. Пожалуйста, не могли бы вы удалить свою награду, раз уж мы пришли к соглашению?
samthebest 07
Редактировать или удалять не могу - такой возможности нет. Я могу наградить, но я думаю, что вы получаете немало очков за одно только внимание, я ошибаюсь? Если вы подтвердите, что хотите, чтобы я наградил вас, я сделаю это в течение следующих 24 часов. Спасибо за исправления и извините за метод, но похоже, что вы игнорируете все предупреждения, это большое дело, и ответ цитировался повсюду.
1
Как насчет того, чтобы вручить награду @Mishael Rosenthal, поскольку он первым четко выразил озабоченность. Меня не интересуют пункты, мне просто нравится использовать SO для SEO и организации.
samthebest 07
2

Еще одно отличие Scalding - это использование в Hadoop объединителей.

Представьте, что ваша операция является коммутативным моноидом, при этом сокращение будет применяться и на стороне карты вместо перетасовки / сортировки всех данных в редукторы. С foldLeft это не так.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

Всегда полезно определять ваши операции как моноид в Scalding.

Morazow
источник