Большое большое различие, не упомянутое ни в одном другом ответе на переполнение стека, относящемся к этой теме, заключается в том, что reduceдолжен быть дан коммутативный моноид , то есть операция, которая является как коммутативной, так и ассоциативной. Это означает, что операцию можно распараллелить.
Это различие очень важно для больших данных / MPP / распределенных вычислений, и является единственной причиной, по которой оно reduceвообще существует. Коллекция может быть разделена на части, и они reduceмогут работать с каждым фрагментом, а затем reduceмогут работать с результатами каждого фрагмента - на самом деле уровень фрагментации не должен останавливаться на один уровень. Мы тоже можем нарезать каждый кусок. Вот почему суммирование целых чисел в списке составляет O (log N), если дано бесконечное количество процессоров.
Если вы просто посмотрите на подписи, у них нет причин для reduceсуществования, потому что вы можете достичь всего, что можете, reduceс помощью foldLeft. Функциональность foldLeftбольше, чем функциональность reduce.
Но вы не можете распараллелить a foldLeft, поэтому его время выполнения всегда O (N) (даже если вы используете коммутативный моноид). Это связано с тем, что предполагается, что операция не является коммутативным моноидом, и поэтому накопленное значение будет вычислено посредством серии последовательных агрегатов.
foldLeftне предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность разбить коллекцию, и ее коммутативность, которая упрощает кумуляцию, потому что порядок не важен (поэтому не имеет значения, в каком порядке агрегировать каждый результат из каждого из фрагментов). Строго говоря, коммутативность не требуется для распараллеливания, например для алгоритмов распределенной сортировки, она просто упрощает логику, потому что вам не нужно упорядочивать свои фрагменты.
Если вы посмотрите документацию Spark, в reduceней конкретно написано «... коммутативный и ассоциативный бинарный оператор»
Вот доказательство того, что reduceэто НЕ частный случайfoldLeft
scala>val intParList:ParSeq[Int]=(1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))Took462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))Took2589.363031 milli seconds
уменьшить vs свернуть
Теперь это то, где он становится немного ближе к математическим корням FP /, и его немного сложнее объяснить. Reduce формально определяется как часть парадигмы MapReduce, которая имеет дело с беспорядочными коллекциями (мультимножествами), Fold формально определяется в терминах рекурсии (см. Катаморфизм) и, таким образом, предполагает структуру / последовательность коллекций.
В foldScalding нет метода, потому что в рамках (строгой) модели программирования Map Reduce мы не можем определить, foldпотому что чанки не имеют упорядочения и foldтребуют только ассоциативности, а не коммутативности.
Проще говоря, reduceработает без порядка кумуляции, foldтребует порядка кумуляции, и именно такой порядок кумуляции требует нулевого значения, а НЕ существования нулевого значения, которое их отличает. Строго говоря, он reduceдолжен работать с пустой коллекцией, потому что его нулевое значение можно вывести, взяв произвольное значение xи затем решив x op y = x, но это не работает с некоммутативной операцией, поскольку могут существовать левое и правое нулевые значения, которые отличаются (т.е. x op y != y op x). Конечно, Scala не пытается выяснить, что это за нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, не поддается вычислению), поэтому просто выдает исключение.
Кажется (как это часто бывает в этимологии), этот первоначальный математический смысл был утерян, поскольку единственное очевидное различие в программировании - это подпись. В результате reduceон стал синонимом foldMapReduce, а не сохранил его первоначальное значение. Теперь эти термины часто используются как взаимозаменяемые и в большинстве реализаций ведут себя одинаково (игнорируя пустые коллекции). Странность усугубляется особенностями, как в Spark, которые мы сейчас рассмотрим.
Так Спарк действительно есть fold, но порядок , в котором результаты суб ( по одному для каждого раздела) объединяются (на момент написания) тот же порядок , в котором задачи выполняются - и , таким образом , не детерминированным. Спасибо @CafeFeed за указание на foldиспользование runJob, которое после прочтения кода я понял, что он недетерминирован. Дальнейшая путаница возникает из-за того, что у Spark есть treeReduceно нет treeFold.
Вывод
Существует разница между reduceи foldдаже при применении к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce для коллекций с произвольным порядком ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ), и следует предполагать, что операторы не только коммутативны, но и являются коммутативными. ассоциативно для получения детерминированных результатов. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.
На практике из-за нематематической природы программирования reduceи, foldкак правило, ведут себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).
Дополнительно: мое мнение об API Spark
Я считаю, что путаницы можно было бы избежать, если бы полностью отказаться от этого термина foldв Spark. По крайней мере, у Spark есть примечание в их документации:
Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций в функциональных языках, таких как Scala.
Вот почему в своем имени foldLeftсодержится Leftсимвол, и поэтому также существует метод fold.
kiritsuku 06
1
@Cloudtech Это совпадение однопоточной реализации, а не в рамках спецификации. На моей 4-ядерной машине, если я пытаюсь добавить .par, (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)то каждый раз получаю разные результаты.
samthebest
2
@AlexDean в контексте информатики, нет, на самом деле ему не нужна личность, поскольку пустые коллекции, как правило, просто генерируют исключения. Но математически более элегантно (и было бы более элегантно, если бы это делали коллекции), если элемент идентичности возвращается, когда коллекция пуста. В математике не существует «выбросить исключение».
samthebest
3
@samthebest: Вы уверены в коммутативности? github.com/apache/spark/blob/… говорит: «Для некоммутативных функций результат может отличаться от результата свертывания, примененного к нераспределенной коллекции».
Make42
1
@ Make42 Правильно, можно написать собственного reallyFoldсутенера, например:, rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)для поездки на работу f не понадобится.
samthebest
10
Если я не ошибаюсь, даже если Spark API этого не требует, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегированы разделы, не гарантирован. Например, в следующем коде сортируется только первая распечатка:
После некоторого разговора мы считаем, что вы правы. Порядок комбинирования - первым пришел - первым подал. Если вы запустите sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)несколько раз с 2+ ядрами, я думаю, вы увидите, что он производит случайный (по разделам) порядок. Я соответствующим образом обновил свой ответ.
samthebest
3
foldв Apache Spark отличается foldот нераспространяемых коллекций. Фактически это требует коммутативной функции для получения детерминированных результатов :
Это ведет себя несколько иначе, чем операции сворачивания, реализованные для нераспределенных коллекций на функциональных языках, таких как Scala. Эта операция сгиба может применяться к разделам по отдельности, а затем складывать эти результаты в окончательный результат, а не применять сгиб к каждому элементу последовательно в некотором определенном порядке. Для некоммутативных функций результат может отличаться от результата свертки, примененной к нераспределенной коллекции.
Было высказано предположение, что наблюдаемое поведение связано с тем, HashPartitionerкогда на самом деле parallelizeне перемешивается и не используется HashPartitioner.
import org.apache.spark.sql.SparkSession/* Note: standalone (non-local) mode */val master ="spark://...:7077"val spark =SparkSession.builder.master(master).getOrCreate()/* Note: deterministic order */val rdd = sc.parallelize(Seq("a","b","c","d"),4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall {caseArray(x, y)=> x < y })/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size ==24)
def fold(zeroValue: T)(op:(T, T)=> T): T = withScope {var jobResult: T
val cleanOp:(T, T)=> T
val foldPartition =Iterator[T]=> T
val mergeResult:(Int, T)=>Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
def reduce(f:(T, T)=> T): T = withScope {val cleanF:(T, T)=> T
val reducePartition:Iterator[T]=>Option[T]var jobResult:Option[T]val mergeResult =(Int,Option[T])=>Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection"))}
где runJobвыполняется без учета порядка разбиения и приводит к необходимости коммутативной функции.
foldPartitionи reducePartitionэквивалентны с точки зрения порядка обработки и эффективно (посредством наследования и делегирования) реализованы самим reduceLeftи foldLeftдалее TraversableOnce.
Вывод: foldRDD не может зависеть от порядка блоков и требует коммутативности и ассоциативности .
Я должен признать, что этимология сбивает с толку, а в литературе по программированию отсутствуют формальные определения. Я думаю, можно с уверенностью сказать, что foldon RDDs действительно действительно то же самое reduce, но это не учитывает коренные математические различия (я обновил свой ответ, чтобы он был еще более ясным). Хотя я не согласен с тем, что нам действительно нужна коммутативность, при условии, что каждый уверен, что бы ни делал его разделитель, он поддерживает порядок.
samthebest
Неопределенный порядок складывания не связан с разбиением. Это прямое следствие реализации runJob.
А! Извините, я не смог понять, в чем заключалась ваша точка зрения, но, прочитав runJobкод, я вижу, что на самом деле он выполняет объединение в соответствии с тем, когда задача завершена, а НЕ с порядком разделов. Именно эта ключевая деталь заставляет все встать на свои места. Я отредактировал мой ответ снова и , таким образом , исправил ошибку вы отмечаете. Пожалуйста, не могли бы вы удалить свою награду, раз уж мы пришли к соглашению?
samthebest 07
Редактировать или удалять не могу - такой возможности нет. Я могу наградить, но я думаю, что вы получаете немало очков за одно только внимание, я ошибаюсь? Если вы подтвердите, что хотите, чтобы я наградил вас, я сделаю это в течение следующих 24 часов. Спасибо за исправления и извините за метод, но похоже, что вы игнорируете все предупреждения, это большое дело, и ответ цитировался повсюду.
1
Как насчет того, чтобы вручить награду @Mishael Rosenthal, поскольку он первым четко выразил озабоченность. Меня не интересуют пункты, мне просто нравится использовать SO для SEO и организации.
samthebest 07
2
Еще одно отличие Scalding - это использование в Hadoop объединителей.
Представьте, что ваша операция является коммутативным моноидом, при этом сокращение будет применяться и на стороне карты вместо перетасовки / сортировки всех данных в редукторы. С foldLeft это не так.
pipe.groupBy('product){
_.reduce('price->'total){(sum:Double, price:Double)=> sum + price }// reduce is .mapReduceMap in disguise}
pipe.groupBy('product){
_.foldLeft('price->'total)(0.0){(sum:Double, price:Double)=> sum + price }}
Всегда полезно определять ваши операции как моноид в Scalding.
Ответы:
уменьшить vs foldLeft
Большое большое различие, не упомянутое ни в одном другом ответе на переполнение стека, относящемся к этой теме, заключается в том, что
reduce
должен быть дан коммутативный моноид , то есть операция, которая является как коммутативной, так и ассоциативной. Это означает, что операцию можно распараллелить.Это различие очень важно для больших данных / MPP / распределенных вычислений, и является единственной причиной, по которой оно
reduce
вообще существует. Коллекция может быть разделена на части, и ониreduce
могут работать с каждым фрагментом, а затемreduce
могут работать с результатами каждого фрагмента - на самом деле уровень фрагментации не должен останавливаться на один уровень. Мы тоже можем нарезать каждый кусок. Вот почему суммирование целых чисел в списке составляет O (log N), если дано бесконечное количество процессоров.Если вы просто посмотрите на подписи, у них нет причин для
reduce
существования, потому что вы можете достичь всего, что можете,reduce
с помощьюfoldLeft
. ФункциональностьfoldLeft
больше, чем функциональностьreduce
.Но вы не можете распараллелить a
foldLeft
, поэтому его время выполнения всегда O (N) (даже если вы используете коммутативный моноид). Это связано с тем, что предполагается, что операция не является коммутативным моноидом, и поэтому накопленное значение будет вычислено посредством серии последовательных агрегатов.foldLeft
не предполагает коммутативности или ассоциативности. Это ассоциативность, которая дает возможность разбить коллекцию, и ее коммутативность, которая упрощает кумуляцию, потому что порядок не важен (поэтому не имеет значения, в каком порядке агрегировать каждый результат из каждого из фрагментов). Строго говоря, коммутативность не требуется для распараллеливания, например для алгоритмов распределенной сортировки, она просто упрощает логику, потому что вам не нужно упорядочивать свои фрагменты.Если вы посмотрите документацию Spark, в
reduce
ней конкретно написано «... коммутативный и ассоциативный бинарный оператор»http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Вот доказательство того, что
reduce
это НЕ частный случайfoldLeft
уменьшить vs свернуть
Теперь это то, где он становится немного ближе к математическим корням FP /, и его немного сложнее объяснить. Reduce формально определяется как часть парадигмы MapReduce, которая имеет дело с беспорядочными коллекциями (мультимножествами), Fold формально определяется в терминах рекурсии (см. Катаморфизм) и, таким образом, предполагает структуру / последовательность коллекций.
В
fold
Scalding нет метода, потому что в рамках (строгой) модели программирования Map Reduce мы не можем определить,fold
потому что чанки не имеют упорядочения иfold
требуют только ассоциативности, а не коммутативности.Проще говоря,
reduce
работает без порядка кумуляции,fold
требует порядка кумуляции, и именно такой порядок кумуляции требует нулевого значения, а НЕ существования нулевого значения, которое их отличает. Строго говоря, онreduce
должен работать с пустой коллекцией, потому что его нулевое значение можно вывести, взяв произвольное значениеx
и затем решивx op y = x
, но это не работает с некоммутативной операцией, поскольку могут существовать левое и правое нулевые значения, которые отличаются (т.е.x op y != y op x
). Конечно, Scala не пытается выяснить, что это за нулевое значение, поскольку для этого потребуется выполнить некоторую математику (которая, вероятно, не поддается вычислению), поэтому просто выдает исключение.Кажется (как это часто бывает в этимологии), этот первоначальный математический смысл был утерян, поскольку единственное очевидное различие в программировании - это подпись. В результате
reduce
он стал синонимомfold
MapReduce, а не сохранил его первоначальное значение. Теперь эти термины часто используются как взаимозаменяемые и в большинстве реализаций ведут себя одинаково (игнорируя пустые коллекции). Странность усугубляется особенностями, как в Spark, которые мы сейчас рассмотрим.Так Спарк действительно есть
fold
, но порядок , в котором результаты суб ( по одному для каждого раздела) объединяются (на момент написания) тот же порядок , в котором задачи выполняются - и , таким образом , не детерминированным. Спасибо @CafeFeed за указание наfold
использованиеrunJob
, которое после прочтения кода я понял, что он недетерминирован. Дальнейшая путаница возникает из-за того, что у Spark естьtreeReduce
но нетtreeFold
.Вывод
Существует разница между
reduce
иfold
даже при применении к непустым последовательностям. Первый определяется как часть парадигмы программирования MapReduce для коллекций с произвольным порядком ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ), и следует предполагать, что операторы не только коммутативны, но и являются коммутативными. ассоциативно для получения детерминированных результатов. Последний определяется в терминах катоморфизмов и требует, чтобы коллекции имели понятие последовательности (или определялись рекурсивно, как связанные списки), поэтому не требуют коммутативных операторов.На практике из-за нематематической природы программирования
reduce
и,fold
как правило, ведут себя одинаково, либо правильно (как в Scala), либо неправильно (как в Spark).Дополнительно: мое мнение об API Spark
Я считаю, что путаницы можно было бы избежать, если бы полностью отказаться от этого термина
fold
в Spark. По крайней мере, у Spark есть примечание в их документации:источник
foldLeft
содержитсяLeft
символ, и поэтому также существует методfold
..par
,(List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
то каждый раз получаю разные результаты.reallyFold
сутенера, например:,rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
для поездки на работу f не понадобится.Если я не ошибаюсь, даже если Spark API этого не требует, fold также требует, чтобы f был коммутативным. Потому что порядок, в котором будут агрегированы разделы, не гарантирован. Например, в следующем коде сортируется только первая распечатка:
Распечатка:
АБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЫЭЮЯ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
источник
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
несколько раз с 2+ ядрами, я думаю, вы увидите, что он производит случайный (по разделам) порядок. Я соответствующим образом обновил свой ответ.fold
в Apache Spark отличаетсяfold
от нераспространяемых коллекций. Фактически это требует коммутативной функции для получения детерминированных результатов :Это было показано на Mishael Rosenthal и предложил Make42 в своем комментарии .
Было высказано предположение, что наблюдаемое поведение связано с тем,
HashPartitioner
когда на самом делеparallelize
не перемешивается и не используетсяHashPartitioner
.Разъяснил:
Структура
fold
для RDDтакая же, как структура
reduce
для RDD:где
runJob
выполняется без учета порядка разбиения и приводит к необходимости коммутативной функции.foldPartition
иreducePartition
эквивалентны с точки зрения порядка обработки и эффективно (посредством наследования и делегирования) реализованы самимreduceLeft
иfoldLeft
далееTraversableOnce
.Вывод:
fold
RDD не может зависеть от порядка блоков и требует коммутативности и ассоциативности .источник
fold
onRDD
s действительно действительно то же самоеreduce
, но это не учитывает коренные математические различия (я обновил свой ответ, чтобы он был еще более ясным). Хотя я не согласен с тем, что нам действительно нужна коммутативность, при условии, что каждый уверен, что бы ни делал его разделитель, он поддерживает порядок.runJob
код, я вижу, что на самом деле он выполняет объединение в соответствии с тем, когда задача завершена, а НЕ с порядком разделов. Именно эта ключевая деталь заставляет все встать на свои места. Я отредактировал мой ответ снова и , таким образом , исправил ошибку вы отмечаете. Пожалуйста, не могли бы вы удалить свою награду, раз уж мы пришли к соглашению?Еще одно отличие Scalding - это использование в Hadoop объединителей.
Представьте, что ваша операция является коммутативным моноидом, при этом сокращение будет применяться и на стороне карты вместо перетасовки / сортировки всех данных в редукторы. С foldLeft это не так.
Всегда полезно определять ваши операции как моноид в Scalding.
источник