Java 8 Streams - собрать vs уменьшить

144

Когда бы вы использовали collect()vs reduce()? Есть ли у кого-нибудь хорошие конкретные примеры, когда однозначно лучше пойти тем или другим путем?

Javadoc упоминает, что collect () является изменяемая редукция .

Учитывая, что это изменяемое сокращение, я предполагаю, что для этого требуется синхронизация (внутренняя), которая, в свою очередь, может отрицательно сказаться на производительности. Предположительно, reduce()его легче распараллеливать за счет создания новой структуры данных для возврата после каждого шага сокращения.

Однако приведенные выше утверждения являются предположениями, и я хотел бы, чтобы здесь вмешался эксперт.

jimhooker2002
источник
1
Остальная часть страницы, на которую вы ссылаетесь, объясняет это: Как и в случае с reduce (), преимущество выражения сбора таким абстрактным способом заключается в том, что он напрямую поддается распараллеливанию: мы можем накапливать частичные результаты параллельно, а затем комбинировать их, если функции накопления и комбинирования удовлетворяют соответствующим требованиям.
JB Nizet,
1
также см. "Streams in Java 8: Reduce vs. Collect" Анжелики Лангер - youtube.com/watch?v=oWlWEKNM5Aw
MasterJoe

Ответы:

115

reduceявляется операцией « сворачивания », она применяет бинарный оператор к каждому элементу в потоке, где первый аргумент оператора - это возвращаемое значение предыдущего приложения, а второй аргумент - текущий элемент потока.

collect- это операция агрегирования, при которой создается «коллекция», и каждый элемент «добавляется» к этой коллекции. Затем коллекции в разных частях потока складываются.

Документ , который вы связаны дает причину , имеющий два различных подхода:

Если бы мы хотели взять поток строк и объединить их в одну длинную строку, мы могли бы добиться этого с помощью обычного сокращения:

 String concatenated = strings.reduce("", String::concat)  

Мы получили бы желаемый результат, и даже параллельно работало бы. Однако спектакль может нас не порадовать! Такая реализация будет делать много копирования строк, а время выполнения будет O (n ^ 2) по количеству символов. Более производительный подход заключался бы в накоплении результатов в StringBuilder, который является изменяемым контейнером для накопления строк. Мы можем использовать ту же технику, чтобы распараллелить изменяемую редукцию, что и обычную редукцию.

reduceДело в том, что распараллеливание одинаково в обоих случаях, но в этом случае мы применяем функцию к самим элементам потока. В этом collectслучае мы применяем функцию к изменяемому контейнеру.

Борис Паук
источник
1
Если это так для collect: «Более производительным подходом было бы накопление результатов в StringBuilder», тогда зачем нам вообще использовать сокращение?
jimhooker2002,
2
@ Jimhooker2002 перечитай. Если вы, скажем, вычисляете продукт, то функцию сокращения можно просто применить к разделенным потокам параллельно, а затем объединить вместе в конце. Процесс сокращения всегда приводит к тому, что тип является потоком. Сбор используется, когда вы хотите собрать результаты в изменяемый контейнер, т. Е. Когда тип результата отличается от типа потока. Это дает преимущество, состоящее в том, что для каждого разделенного потока можно использовать один экземпляр контейнера, но недостатком является необходимость объединения контейнеров в конце.
Boris the Spider
1
@ jimhooker2002 в примере продукта intявляется неизменным, поэтому вы не можете легко использовать операцию сбора. Вы могли бы сделать грязный хак, например использовать AtomicIntegerили какой-нибудь обычай, IntWrapperно зачем вам это? Операция складывания просто отличается от операции сбора.
Boris the Spider
18
Существует также другой reduceметод, с помощью которого вы можете возвращать объекты типа, отличного от элементов потока.
Константин Милютин
1
Еще один случай, когда вы использовали бы сбор вместо сокращения, - это когда операция сокращения включает добавление элементов в коллекцию, а затем каждый раз, когда ваша функция-накопитель обрабатывает элемент, она создает новую коллекцию, которая включает этот элемент, что неэффективно.
raghu
41

Причина проста в том, что:

  • collect() может работать только с изменяемыми объектами результата.
  • reduce()будет предназначен для работы с неизменяемыми объектами результата.

" reduce()с неизменным" примером

public class Employee {
  private Integer salary;
  public Employee(String aSalary){
    this.salary = new Integer(aSalary);
  }
  public Integer getSalary(){
    return this.salary;
  }
}

@Test
public void testReduceWithImmutable(){
  List<Employee> list = new LinkedList<>();
  list.add(new Employee("1"));
  list.add(new Employee("2"));
  list.add(new Employee("3"));

  Integer sum = list
  .stream()
  .map(Employee::getSalary)
  .reduce(0, (Integer a, Integer b) -> Integer.sum(a, b));

  assertEquals(Integer.valueOf(6), sum);
}

collect()пример " с изменяемым"

Например , если вы хотите вручную вычислить сумму , используя collect()это не может работать с , BigDecimalно только MutableIntиз org.apache.commons.lang.mutable, например. Увидеть:

public class Employee {
  private MutableInt salary;
  public Employee(String aSalary){
    this.salary = new MutableInt(aSalary);
  }
  public MutableInt getSalary(){
    return this.salary;
  }
}

@Test
public void testCollectWithMutable(){
  List<Employee> list = new LinkedList<>();
  list.add(new Employee("1"));
  list.add(new Employee("2"));

  MutableInt sum = list.stream().collect(
    MutableInt::new, 
    (MutableInt container, Employee employee) -> 
      container.add(employee.getSalary().intValue())
    , 
    MutableInt::add);
  assertEquals(new MutableInt(3), sum);
}

Это работает, потому что аккумулятор container.add(employee.getSalary().intValue()); не должен возвращать новый объект с результатом, а должен изменять состояние изменяемого containerтипа MutableInt.

Если вы хотите использовать BigDecimalвместо этого, containerвы не можете использовать этот collect()метод, поскольку container.add(employee.getSalary());не изменили бы, containerпотому что BigDecimalон неизменяемый. (Кроме этого BigDecimal::newне будет работать, так как BigDecimalнет пустого конструктора)

Сандро
источник
2
Обратите внимание, что вы используете Integerконструктор ( new Integer(6)), который не рекомендуется в более поздних версиях Java.
MC Emperor
1
Хороший улов @MCEmperor! Я изменил его наInteger.valueOf(6)
Сандро
@Sandro - я запутался. Почему вы говорите, что collect () работает только с изменяемыми объектами? Я использовал его для объединения строк. String allNames = employee.stream () .map (Employee :: getNameString) .collect (Collectors.joining (",")) .toString ();
MasterJoe
1
@ MasterJoe2 Все просто. Короче говоря, реализация по-прежнему использует StringBuilderизменяемый. См .: hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/…
Сандро
32

Нормальное сокращение предназначено для объединения двух неизменяемых значений, таких как int, double и т. Д., И создания нового; это неизменное сокращение. Напротив, метод collect предназначен для изменения контейнера для накопления результата, который он должен произвести.

Чтобы проиллюстрировать проблему, предположим, что вы хотите добиться Collectors.toList()простого сокращения, например

List<Integer> numbers = stream.reduce(
        new ArrayList<Integer>(),
        (List<Integer> l, Integer e) -> {
            l.add(e);
            return l;
        },
        (List<Integer> l1, List<Integer> l2) -> {
            l1.addAll(l2);
            return l1;
        });

Это эквивалент Collectors.toList(). Однако в этом случае вы изменяете файл List<Integer>. Как мы знаем, ArrayListон не является потокобезопасным и небезопасно добавлять / удалять из него значения во время итерации, поэтому вы получите либо одновременное исключение, ArrayIndexOutOfBoundsExceptionлибо какое-либо исключение (особенно при параллельном запуске) при обновлении списка или объединителя. пытается объединить списки, потому что вы изменяете список, накапливая (добавляя) к нему целые числа. Если вы хотите сделать этот потокобезопасным, вам нужно каждый раз передавать новый список, что снизит производительность.

Напротив, Collectors.toList()работает аналогичным образом. Однако он гарантирует безопасность потоков, когда вы накапливаете значения в списке. Из документации к collectметоду :

Выполняет изменяемую операцию сокращения для элементов этого потока с помощью Collector. Если поток параллельный, а коллектор является параллельным, и либо поток неупорядочен, либо коллектор неупорядочен, то одновременное сокращение будет выполнено. При параллельном выполнении несколько промежуточных результатов могут быть созданы, заполнены и объединены, чтобы поддерживать изоляцию изменяемых структур данных. Следовательно, даже при параллельном выполнении со структурами данных, не защищенными от потоков (такими как ArrayList), для параллельного сокращения не требуется дополнительной синхронизации.

Итак, чтобы ответить на ваш вопрос:

Когда бы вы использовали collect()vs reduce()?

если у вас есть незыблемые ценности , такие как ints, doubles, Stringsто нормальное снижение работает просто отлично. Однако, если вам нужно reduceпреобразовать свои значения в, скажем, List(изменяемую структуру данных), вам необходимо использовать изменяемое сокращение с помощью этого collectметода.

Джордж
источник
Во фрагменте кода, я думаю, проблема в том, что он берет идентификатор (в данном случае один экземпляр ArrayList) и предполагает, что он «неизменяемый», поэтому они могут запускать xпотоки, каждый «добавляя к идентификатору», а затем объединяя их вместе. Хороший пример.
rogerdpack 02
почему мы получим исключение одновременной модификации, вызывающие потоки просто будут повторно запускать последовательный поток, а это означает, что он будет обрабатываться одним потоком, а функция комбайнера вообще не вызывается?
амарнатх хариш
public static void main(String[] args) { List<Integer> l = new ArrayList<>(); l.add(1); l.add(10); l.add(3); l.add(-3); l.add(-4); List<Integer> numbers = l.stream().reduce( new ArrayList<Integer>(), (List<Integer> l2, Integer e) -> { l2.add(e); return l2; }, (List<Integer> l1, List<Integer> l2) -> { l1.addAll(l2); return l1; });for(Integer i:numbers)System.out.println(i); } }Я попробовал и не получил исключение
CCm
@amarnathharish проблема возникает при попытке запустить его параллельно и несколько потоков пытаются получить доступ к такой же список
ДЖОРДЖ
11

Пусть поток будет a <- b <- c <- d

В сокращении,

у вас будет ((a # b) # c) # d

где # - это интересная операция, которую вы хотели бы проделать.

В коллекции,

у вашего коллектора будет какая-то собирающая структура К.

K потребляет. Затем K потребляет b. Затем K потребляет c. Затем K потребляет d.

В конце вы спрашиваете K, каков окончательный результат.

Затем К. дает его вам.

Ян Нг
источник
2

Они сильно различаются по потенциальному объему памяти во время выполнения. Пока collect()собирает и помещает все данные в коллекцию, reduce()явно просит вас указать, как уменьшить данные, прошедшие через поток.

Например, если вы хотите прочитать некоторые данные из файла, обработать их и поместить в некоторую базу данных, вы можете получить код потока Java, подобный этому:

streamDataFromFile(file)
            .map(data -> processData(data))
            .map(result -> database.save(result))
            .collect(Collectors.toList());

В этом случае мы используем, collect()чтобы заставить Java передавать данные и сохранять результат в базе данных. Без collect()данных никогда не читаются и никогда не сохраняются.

Этот код успешно генерирует java.lang.OutOfMemoryError: Java heap spaceошибку времени выполнения, если размер файла достаточно велик или размер кучи достаточно мал. Очевидная причина в том, что он пытается сложить все данные, которые прошли через поток (и, по сути, уже были сохранены в базе данных), в результирующую коллекцию, и это взорвёт кучу.

Однако, если вы замените его collect()на reduce()- это больше не будет проблемой, поскольку последний уменьшит и отбросит все данные, которые прошли.

В представленном примере просто замените collect()что-нибудь на reduce:

.reduce(0L, (aLong, result) -> aLong, (aLong1, aLong2) -> aLong1);

Вам даже не нужно заботиться о том, чтобы вычисления зависели от того, resultчто Java не является чистым языком FP ​​(функционального программирования) и не может оптимизировать данные, которые не используются в нижней части потока из-за возможных побочных эффектов. .

Авераско
источник
3
Если вас не волнуют результаты сохранения вашей базы данных, вы должны использовать forEach ... вам не нужно использовать сокращение. Если это не было для иллюстративных целей.
Дэйв Эдельштейн
2

Вот пример кода

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7);
int sum = list.stream().reduce((x,y) -> {
        System.out.println(String.format("x=%d,y=%d",x,y));
        return (x + y);
    }).get();

System.out.println (сумма);

Вот результат выполнения:

x=1,y=2
x=3,y=3
x=6,y=4
x=10,y=5
x=15,y=6
x=21,y=7
28

Уменьшите дескриптор функции двух параметров, первый параметр - это предыдущее возвращаемое значение в потоке, второй параметр - это текущее вычисляемое значение в потоке, он суммирует первое значение и текущее значение как первое значение в следующем вычислении.

JetQin
источник
0

Согласно документам

Сборщики reduce () наиболее полезны при использовании в многоуровневой редукции, после groupingBy или partitioningBy. Чтобы выполнить простое сокращение потока, используйте вместо этого Stream.reduce (BinaryOperator).

Так что в основном вы будете использовать reducing()только в случае принудительного сбора. Вот еще один пример :

 For example, given a stream of Person, to calculate the longest last name 
 of residents in each city:

    Comparator<String> byLength = Comparator.comparing(String::length);
    Map<String, String> longestLastNameByCity
        = personList.stream().collect(groupingBy(Person::getCity,
            reducing("", Person::getLastName, BinaryOperator.maxBy(byLength))));

Согласно этому руководству, сокращение иногда менее эффективно

Операция сокращения всегда возвращает новое значение. Однако функция аккумулятора также возвращает новое значение каждый раз, когда обрабатывает элемент потока. Предположим, вы хотите сократить элементы потока до более сложного объекта, например коллекции. Это может снизить производительность вашего приложения. Если ваша операция сокращения включает добавление элементов в коллекцию, то каждый раз, когда ваша функция-аккумулятор обрабатывает элемент, она создает новую коллекцию, включающую этот элемент, что неэффективно. Было бы более эффективно вместо этого обновить существующую коллекцию. Вы можете сделать это с помощью метода Stream.collect, который описывается в следующем разделе ...

Таким образом, удостоверение «повторно используется» в сценарии сокращения, поэтому использовать его будет немного эффективнее, .reduceесли это возможно.

Роджердпак
источник