Зачем нужен комбайнер для метода reduce, преобразующего тип в java 8

150

Мне сложно полностью понять роль, которую combinerвыполняет reduceметод Streams .

Например, следующий код не компилируется:

int length = asList("str1", "str2").stream()
            .reduce(0, (accumulatedInt, str) -> accumulatedInt + str.length());

Ошибка компиляции говорит: (несоответствие аргументов; int не может быть преобразован в java.lang.String)

но этот код компилируется:

int length = asList("str1", "str2").stream()  
    .reduce(0, (accumulatedInt, str ) -> accumulatedInt + str.length(), 
                (accumulatedInt, accumulatedInt2) -> accumulatedInt + accumulatedInt2);

Я понимаю, что метод комбинатора используется в параллельных потоках - поэтому в моем примере он складывает два промежуточных накопленных целых числа.

Но я не понимаю, почему первый пример не компилируется без комбайнера или как комбайнер решает преобразование строки в int, поскольку он просто складывает вместе два int.

Кто-нибудь может пролить свет на это?

Луиза Миллер
источник
Связанный вопрос: stackoverflow.com/questions/24202473/…
nosid
2
ага, это для параллельных потоков ... Я называю дырявую абстракцию!
Энди

Ответы:

78

Версии с двумя и тремя аргументами, reduceкоторые вы пытались использовать, не принимают один и тот же тип для accumulator.

Два аргумента reduceбудет определен как :

T reduce(T identity,
         BinaryOperator<T> accumulator)

В вашем случае T - это String, поэтому он BinaryOperator<T>должен принимать два аргумента String и возвращать String. Но вы передаете ему int и String, что приводит к ошибке компиляции, которую вы получили - argument mismatch; int cannot be converted to java.lang.String. На самом деле, я думаю, что передача 0 в качестве значения идентификатора здесь также неверна, поскольку ожидается String (T).

Также обратите внимание, что эта версия reduce обрабатывает поток Ts и возвращает T, поэтому вы не можете использовать ее для уменьшения потока String до int.

Три аргумента reduceбудет определен как :

<U> U reduce(U identity,
             BiFunction<U,? super T,U> accumulator,
             BinaryOperator<U> combiner)

В вашем случае U - Integer, а T - String, поэтому этот метод уменьшит поток String до Integer.

Для BiFunction<U,? super T,U>аккумулятора вы можете передавать параметры двух разных типов (U и? Super T), которые в вашем случае являются Integer и String. Кроме того, значение идентификатора U в вашем случае принимает целое число, поэтому передача 0 - это нормально.

Еще один способ добиться желаемого:

int length = asList("str1", "str2").stream().mapToInt (s -> s.length())
            .reduce(0, (accumulatedInt, len) -> accumulatedInt + len);

Здесь тип потока соответствует типу возвращаемого значения reduce, поэтому вы можете использовать двухпараметрическую версию reduce.

Конечно, вам совсем не обязательно использовать reduce:

int length = asList("str1", "str2").stream().mapToInt (s -> s.length())
            .sum();
Эран
источник
9
В качестве второго варианта в вашем последнем коде вы также можете использовать mapToInt(String::length)over mapToInt(s -> s.length()), не уверен, что один из них лучше другого, но я предпочитаю первый для удобочитаемости.
skiwi
24
Многие найдут этот ответ, так как не поймут, зачем combinerэто нужно, почему не accumulatorдостаточно. В этом случае: комбайнер нужен только для параллельных потоков, чтобы объединить «накопленные» результаты потоков.
ddekany
2
Я не считаю ваш ответ особенно полезным - потому что вы вообще не объясняете, что должен делать комбайнер и как я могу работать без него! В моем случае я хочу уменьшить тип T до U, но это вообще невозможно сделать параллельно. Это просто невозможно. Как сообщить системе, что я не хочу / не нуждаюсь в параллелизме и, таким образом, исключить комбайнер?
Zordid
@Zordid API Streams не включает возможность уменьшить тип T до U без передачи комбайнера.
Эран
Этот ответ вообще не объясняет комбайнер, а только то, почему OP нужны варианты без комбайнера.
Бенни Боттема,
226

В ответе Эрана описаны различия между версиями с двумя и тремя аргументами reduceв том, что первая сводится Stream<T>к, Tа вторая сводится Stream<T>к U. Однако на самом деле это не объясняло необходимость использования дополнительной функции объединения при сокращении Stream<T>до U.

Один из принципов разработки Streams API заключается в том, что API не должен различаться между последовательными и параллельными потоками, или, другими словами, конкретный API не должен препятствовать правильной работе потока ни последовательно, ни параллельно. Если ваши лямбда-выражения имеют правильные свойства (ассоциативность, отсутствие помех и т. Д.), Поток, выполняемый последовательно или параллельно, должен давать те же результаты.

Давайте сначала рассмотрим вариант сокращения с двумя аргументами:

T reduce(I, (T, T) -> T)

Последовательная реализация проста. Идентификационное значение I«накапливается» с нулевым элементом потока для получения результата. Этот результат накапливается с первым элементом потока, чтобы дать другой результат, который, в свою очередь, накапливается со вторым элементом потока и так далее. После накопления последнего элемента возвращается окончательный результат.

Параллельная реализация начинается с разделения потока на сегменты. Каждый сегмент обрабатывается своим собственным потоком последовательным способом, который я описал выше. Теперь, если у нас есть N потоков, у нас есть N промежуточных результатов. Их нужно свести к одному результату. Поскольку каждый промежуточный результат имеет тип T, а у нас их несколько, мы можем использовать одну и ту же функцию накопителя, чтобы уменьшить эти N промежуточных результатов до одного результата.

Теперь давайте рассмотрим гипотетическую операцию сокращения двух аргументов, которая сводится Stream<T>к U. На других языках это называется операцией «сворачивание» или «сворачивание влево», поэтому здесь я назову это именно так. Обратите внимание, этого нет в Java.

U foldLeft(I, (U, T) -> U)

(Обратите внимание, что значение идентификатора Iотносится к типу U.)

Последовательная версия foldLeftаналогична последовательной версии, за reduceисключением того, что промежуточные значения имеют тип U, а не тип T. Но в остальном все то же самое. (Гипотетическая foldRightоперация будет аналогичной, за исключением того, что операции будут выполняться справа налево, а не слева направо.)

Теперь рассмотрим параллельную версию foldLeft. Начнем с разделения потока на сегменты. Затем мы можем заставить каждый из N потоков уменьшить значения T в своем сегменте до N промежуточных значений типа U. Что теперь? Как нам перейти от N значений типа U к одному результату типа U?

Чего не хватает, так это еще одной функции, которая объединяет несколько промежуточных результатов типа U в один результат типа U. Если у нас есть функция, объединяющая два значения U в одно, этого достаточно, чтобы уменьшить любое количество значений до одного - точно так же, как исходное сокращение выше. Таким образом, операция приведения, дающая результат другого типа, требует двух функций:

U reduce(I, (U, T) -> U, (U, U) -> U)

Или, используя синтаксис Java:

<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)

Таким образом, чтобы выполнить параллельное сокращение до другого типа результата, нам нужны две функции: одна, которая накапливает элементы T до промежуточных значений U, а вторая объединяет промежуточные значения U в один результат U. Если мы не переключаем типы, оказывается, что функция аккумулятора такая же, как и функция объединителя. Вот почему приведение к одному и тому же типу выполняет только функцию накопителя, а приведение к другому типу требует отдельных функций накопителя и сумматора.

Наконец, Java не предусматривает foldLeftи foldRightоперации , потому что они подразумевают определенный порядок операций , которые по своей сути является последовательным. Это противоречит принципу проектирования, изложенному выше, о предоставлении API, которые в равной степени поддерживают последовательную и параллельную работу.

Стюарт Маркс
источник
9
Итак, что вы можете сделать, если вам нужен, foldLeftпотому что вычисление зависит от предыдущего результата и не может быть распараллелено?
амеб
5
@amoebe Вы можете реализовать свой собственный foldLeft, используя forEachOrdered. Однако промежуточное состояние должно храниться в захваченной переменной.
Стюарт Маркс
@StuartMarks спасибо, в итоге я использовал jOOλ. У них аккуратная реализацияfoldLeft .
амеб
1
Люблю этот ответ! Поправьте меня, если я ошибаюсь: это объясняет, почему рабочий пример OP (второй) никогда не будет вызывать объединитель при запуске, будучи последовательным потоком.
Луиджи Кортезе
2
Это объясняет почти все ... кроме: почему это должно исключать последовательное сокращение. В моем случае НЕВОЗМОЖНО делать это параллельно, поскольку мое сокращение сокращает список функций до U, вызывая каждую функцию для промежуточного результата результата ее предшественника. Это вообще невозможно сделать параллельно, и нет способа описать объединитель. Какой метод я могу использовать для этого?
Zordid
124

Поскольку мне нравятся каракули и стрелки, чтобы прояснить концепцию ... приступим!

От строки к строке (последовательный поток)

Предположим, у вас есть 4 строки: ваша цель - объединить такие строки в одну. Вы в основном начинаете с типа и заканчиваете тем же типом.

Вы можете добиться этого с помощью

String res = Arrays.asList("one", "two","three","four")
        .stream()
        .reduce("",
                (accumulatedStr, str) -> accumulatedStr + str);  //accumulator

и это поможет вам визуализировать происходящее:

введите описание изображения здесь

Функция накопителя шаг за шагом преобразует элементы в вашем (красном) потоке в окончательное уменьшенное (зеленое) значение. Функция аккумулятора просто преобразует один Stringобъект в другой String.

От String до int (параллельный поток)

Предположим, у вас есть те же 4 строки: ваша новая цель - суммировать их длины, и вы хотите распараллелить поток.

Вам нужно что-то вроде этого:

int length = Arrays.asList("one", "two","three","four")
        .parallelStream()
        .reduce(0,
                (accumulatedInt, str) -> accumulatedInt + str.length(),                 //accumulator
                (accumulatedInt, accumulatedInt2) -> accumulatedInt + accumulatedInt2); //combiner

а это схема происходящего

введите описание изображения здесь

Здесь функция аккумулятора (а BiFunction) позволяет преобразовать ваши Stringданные в intданные. Поскольку поток параллелен, он разделен на две (красные) части, каждая из которых разрабатывается независимо друг от друга и дает столько же частичных (оранжевых) результатов. Определение объединителя необходимо для обеспечения правила объединения частичных intрезультатов в окончательный (зеленый) int.

От String до int (последовательный поток)

Что делать, если вы не хотите распараллеливать свой поток? Что ж, комбайнер должен быть предоставлен в любом случае, но он никогда не будет вызван, учитывая, что не будут получены частичные результаты.

Луиджи Кортезе
источник
8
Спасибо за это. Мне даже не нужно было читать. Я бы хотел, чтобы они просто добавили чертову функцию складывания.
Lodewijk Bogaards, 04
1
@LodewijkBogaards рад, что это помогло! JavaDoc здесь действительно довольно загадочный
Луиджи Кортезе
@LuigiCortese Всегда ли в параллельном потоке элементы делятся на пары?
TheLogicGuy
2
Я ценю ваш ясный и полезный ответ. Я хочу повторить кое-что из того, что вы сказали: «Ну, комбайнер в любом случае должен быть предоставлен, но он никогда не будет вызван». Это часть Дивного нового мира функционального программирования Java, который, как меня бесчисленное количество раз уверяли, «делает ваш код более кратким и легким для чтения». Будем надеяться, что примеров такой лаконичной ясности (цитаты из пальца) пока немного.
dnuttle
Будет НАМНОГО лучше проиллюстрировать
редукцию
0

Нет версии с сокращением, которая принимает два разных типа без объединителя, поскольку она не может выполняться параллельно (не уверен, почему это является требованием). Тот факт, что аккумулятор должен быть ассоциативным, делает этот интерфейс практически бесполезным, поскольку:

list.stream().reduce(identity,
                     accumulator,
                     combiner);

Дает такие же результаты, как:

list.stream().map(i -> accumulator(identity, i))
             .reduce(identity,
                     combiner);
викторина123
источник
Такой mapтрюк в зависимости от конкретного accumulatorи combinerможет сильно замедлить работу.
Тагир Валеев
Или значительно ускорите его, так как теперь вы можете упростить accumulator, отбросив первый параметр.
quiz123
Возможна параллельная редукция, это зависит от ваших вычислений. В вашем случае вы должны осознавать сложность комбайнера, но также и аккумулятора для идентификации по сравнению с другими экземплярами.
LoganMzz