Я хотел бы продублировать поток Java 8, чтобы иметь возможность работать с ним дважды. Я могу collect
как список и получать новые потоки из этого;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Но я думаю, что должен быть более эффективный / элегантный способ.
Есть ли способ скопировать поток, не превращая его в коллекцию?
На самом деле я работаю с потоком Either
s, поэтому хочу обработать левую проекцию одним способом, прежде чем перейти к правой проекции и работать с ней по-другому. Что-то вроде этого (с которым я пока вынужден использовать этот toList
трюк).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Тоби
источник
источник
Ответы:
Я думаю, что ваше предположение об эффективности отчасти неверное. Вы получите эту огромную отдачу от эффективности, если собираетесь использовать данные только один раз, потому что вам не нужно их хранить, а потоки дают вам мощную оптимизацию «слияния циклов», которая позволяет вам эффективно передавать все данные через конвейер.
Если вы хотите повторно использовать одни и те же данные, то по определению вы должны либо сгенерировать их дважды (детерминированно), либо сохранить. Если он уже есть в коллекции, отлично; то повторение его дважды обходится дешево.
Мы экспериментировали в дизайне с «разветвленными потоками». Мы обнаружили, что поддержка этого требует реальных затрат; это обременяет общий случай (использовать один раз) за счет необычного случая. Большая проблема заключалась в том, «что происходит, когда два конвейера не потребляют данные с одинаковой скоростью». Теперь вы все равно вернетесь к буферизации. Это была особенность, которая явно не имела веса.
Если вы хотите работать с одними и теми же данными несколько раз, либо сохраните их, либо структурируйте свои операции как Потребители и выполните следующие действия:
Вы также можете изучить библиотеку RxJava, поскольку ее модель обработки лучше подходит для такого рода «разветвления потока».
источник
toList
), чтобы иметь возможность их обработать (Either
случай являясь примером)?Вы можете использовать локальную переменную с,
Supplier
чтобы настроить общие части конвейера потока.С http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
источник
Supplier
еслиStream
он построен "дорогостоящим" способом, вы платите эту стоимость за каждый вызовSupplier.get()
. то есть, если запрос к базе данных ... этот запрос выполняется каждый разSet<Integer>
использованиеcollect(Collectors.toSet())
... и проделать с этим пару операций. Я хотел,max()
и если конкретное значение было задано как две операции ...filter(d -> d == -1).count() == 1;
Используйте a
Supplier
для создания потока для каждой операции завершения.Когда вам понадобится поток этой коллекции, используйте,
streamSupplier.get()
чтобы получить новый поток.Примеры:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
источник
Мы реализовали
duplicate()
метод для потоков в jOOλ , библиотеке с открытым исходным кодом, которую мы создали для улучшения интеграционного тестирования для jOOQ . По сути, вы можете просто написать:Внутри есть буфер, в котором хранятся все значения, полученные из одного потока, но не из другого. Вероятно, это так же эффективно, как если бы ваши два потока потреблялись примерно с одинаковой скоростью, и если вы можете жить с отсутствием безопасности потоков .
Вот как работает алгоритм:
Больше исходного кода здесь
Tuple2
вероятно, как вашPair
тип,Seq
ноStream
с некоторыми улучшениями.источник
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, лучшеTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. ИспользованиеCollectors.mapping/reducing
одного может выражать другие потоковые операции как сборщики и элементы обработки совершенно по-разному, создавая единый результирующий кортеж. В общем, вы можете делать много вещей, потребляя поток один раз без дублирования, и он будет удобен для параллелизма.offer()
/poll()
API, ноArrayDeque
мог бы сделать то же самое.Вы можете создать поток исполняемых файлов (например):
Где
failure
иsuccess
какие операции применять. Однако это создаст довольно много временных объектов и может быть не более эффективным, чем запуск из коллекции и потоковая / итерация дважды.источник
Другой способ обрабатывать элементы несколько раз - использовать Stream.peek (Consumer) :
peek(Consumer)
можно связывать столько раз, сколько необходимо.источник
cyclops-react , библиотека, в которую я работаю, имеет статический метод, который позволит вам дублировать поток (и возвращает кортеж потоков jOOλ).
См. Комментарии, при использовании дубликата в существующем потоке будет снижена производительность. Более производительной альтернативой было бы использование Streamable: -
Существует также (ленивый) класс Streamable, который можно создать из Stream, Iterable или Array и многократно воспроизводить.
AsStreamable.synchronizedFromStream (stream) - может использоваться для создания Streamable, который будет лениво заполнять его резервную коллекцию таким образом, чтобы можно было совместно использовать его между потоками. Streamable.fromStream (stream) не повлечет за собой никаких накладных расходов на синхронизацию.
источник
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(как предлагает OP). Также просьба прямо указать в ответе, что вы автор циклоп-стримов. Прочтите это .Для этой конкретной проблемы вы также можете использовать разделение. Что-то вроде
источник
Мы можем использовать Stream Builder во время чтения или итерации потока. Вот документ Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Пример использования
Допустим, у нас есть поток сотрудников, и нам нужно использовать этот поток для записи данных сотрудников в файл Excel, а затем обновить коллекцию / таблицу сотрудников [Это просто пример использования, демонстрирующий использование Stream Builder]:
источник
У меня была аналогичная проблема, и я мог подумать о трех различных промежуточных структурах, из которых можно было бы создать копию потока: a
List
, массив и aStream.Builder
. Я написал небольшую тестовую программу, которая показала, что с точки зрения производительностиList
он примерно на 30% медленнее, чем два других, которые были довольно похожи.Единственный недостаток преобразования в массив состоит в том, что это сложно, если ваш тип элемента является универсальным типом (что в моем случае было); поэтому я предпочитаю использовать
Stream.Builder
.В итоге я написал небольшую функцию, которая создает
Collector
:Затем я могу сделать копию любого потока
str
, сделавstr.collect(copyCollector())
это, как мне кажется, в соответствии с идиоматическим использованием потоков.источник