Можете ли вы разделить поток на два потока?

146

У меня есть набор данных, представленный потоком Java 8:

Stream<T> stream = ...;

Я вижу, как отфильтровать его, чтобы получить случайное подмножество - например,

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

Я также вижу, как можно уменьшить этот поток, чтобы получить, например, два списка, представляющих две случайные половины набора данных, а затем превратить их обратно в потоки. Но есть ли прямой способ генерировать два потока из исходного? Что-то вроде

(heads, tails) = stream.[some kind of split based on filter]

Спасибо за понимание.

user1148758
источник
Ответ Марка гораздо полезнее, чем ответ Луи, но я должен сказать, что ответ Луи больше связан с первоначальным вопросом. Вопрос скорее сфокусирован на возможности конвертировать Streamв несколько Streams без промежуточного преобразования , хотя я думаю, что люди, которые достигли этого вопроса, на самом деле ищут способ достичь этого, независимо от такого ограничения, что является ответом Марка. Это может быть связано с тем, что вопрос в заголовке не такой, как в описании .
Devildelta

Ответы:

9

Не совсем. Вы не можете получить два Streamс одного; это не имеет смысла - как бы вы перебрали одно без необходимости генерировать другое одновременно? Поток может быть использован только один раз.

Однако, если вы хотите сбросить их в список или что-то, вы можете сделать

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Луи Вассерман
источник
65
Почему это не имеет смысла? Поскольку поток является конвейером, нет причин, по которым он не может создать двух производителей исходного потока, я мог видеть, что это обрабатывается сборщиком, который предоставляет два потока.
Бретт Райан
36
Не потокобезопасен. Плохой совет, пытаясь добавить непосредственно в коллекцию, поэтому у нас есть stream.collect(...)for с предопределенным потокобезопасным Collectors, который хорошо работает даже в не поточно-безопасных коллекциях (без конфликта синхронизированных блокировок). Лучший ответ @MarkJeronimus.
YoYo
1
@JoD Это потокобезопасный, если головы и хвосты являются потокобезопасными. Кроме того, при условии использования непараллельных потоков, только порядок не гарантируется, поэтому они являются потокобезопасными. Программист должен решить проблемы параллелизма, поэтому этот ответ идеально подходит, если коллекции являются поточно-ориентированными.
Николас
1
@ Никсон он не подходит при наличии лучшего решения, которое мы имеем здесь. Наличие такого кода может привести к плохому прецеденту, заставляя других использовать его неправильно. Даже если параллельные потоки не используются, это только один шаг. Хорошая практика кодирования требует от нас не поддерживать состояние во время потоковых операций. Следующее, что мы делаем, - это кодирование в такой среде, как Apache Spark, и такие же практики могут привести к неожиданным результатам. Я даю это творческое решение, которое я мог бы написать сам не так давно.
Йо-
1
@JoD Это не лучшее решение, на самом деле оно более неэффективно. Такое мышление в конечном итоге заканчивается выводом о том, что все коллекции должны быть поточно-безопасными по умолчанию для предотвращения непреднамеренных последствий, что просто неправильно.
Николас
301

Коллектор может быть использован для этого.

  • Для двух категорий используйте Collectors.partitioningBy()фабрику.

Это позволит создать Mapот Booleanдо List, и поставить элементы в один или другой список , основанный на Predicate.

Примечание. Поскольку поток должен потребляться целиком, он не может работать с бесконечными потоками. И поскольку поток все равно используется, этот метод просто помещает их в списки вместо создания нового потока с памятью. Вы всегда можете транслировать эти списки, если вам нужны потоки в качестве вывода.

Кроме того, нет необходимости в итераторе, даже в приведенном вами примере только для заголовков.

  • Двоичное разбиение выглядит так:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • Для большего количества категорий используйте Collectors.groupingBy()фабрику.
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

В случае, если потоки не являются Stream, но один из примитивных потоков, как IntStream, то этот .collect(Collectors)метод недоступен. Тебе придется делать это вручную, без коллекторской фабрики. Его реализация выглядит так:

[Пример 2.0 с 2020-04-16]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

В этом примере я инициализирую ArrayLists с полным размером исходной коллекции (если это вообще известно). Это предотвращает события изменения размера даже в худшем случае, но потенциально может поглотить пространство 2 * N * T (N = начальное количество элементов, T = количество потоков). Чтобы найти компромисс между скоростью и быстродействием, вы можете опустить его или использовать наиболее обоснованное предположение, например, ожидаемое наибольшее количество элементов в одном разделе (обычно чуть больше N / 2 для сбалансированного разделения).

Я надеюсь, что никого не оскорбляю, используя метод Java 9. Для версии Java 8 посмотрите историю изменений.

Марк Жеронимус
источник
2
Прекрасный. Однако последнее решение для IntStream не будет поточно-ориентированным в случае параллельного потока. Решение намного проще, чем вы думаете ... stream.boxed().collect(...);! Это будет сделано как рекламируется: конвертировать примитив IntStreamв коробочную Stream<Integer>версию.
YoYo
32
Это должен быть принятый ответ, поскольку он напрямую решает вопрос ОП.
ejel
27
Я бы хотел, чтобы переполнение стека позволило бы сообществу переопределить выбранный ответ, если будет найден лучший.
GuiSim
Я не уверен, что это отвечает на вопрос. Вопрос требует разделения потока на потоки, а не списков.
Алик Эльзин-килака
1
Функция накопителя излишне многословна. Вместо (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }тебя можно просто использовать (map, x) -> map.get(p.test(x)).add(x). Кроме того, я не вижу причин, по которым collectоперация не должна быть поточно-ориентированной. Это работает точно так, как это должно работать, и очень близко к тому, как Collectors.partitioningBy(p)будет работать. Но я бы использовал IntPredicateвместо, Predicate<Integer>если не использовать boxed(), чтобы избежать бокса в два раза.
Хольгер
21

Я наткнулся на этот вопрос для себя и чувствую, что у разветвленного потока есть несколько вариантов использования, которые могут оказаться действительными. Я написал приведенный ниже код как потребитель, так что он ничего не делает, но вы можете применить его к функциям и ко всему, с чем вы можете столкнуться.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Теперь ваша реализация кода может выглядеть примерно так:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
Ludger
источник
20

К сожалению, то, что вы просите, прямо не одобряется в JavaDoc Stream :

Поток должен использоваться (вызывая промежуточную или терминальную операцию потока) только один раз. Это исключает, например, «разветвленные» потоки, где один и тот же источник передает два или более конвейеров или несколько обходов одного и того же потока.

Вы можете обойти это, используя peekили другие методы, если вы действительно желаете такого поведения. В этом случае, вместо того, чтобы пытаться создать резервные копии двух потоков из одного и того же исходного источника потока с помощью разветвляющегося фильтра, вы дублируете свой поток и фильтруете каждый из дубликатов соответствующим образом.

Тем не менее, вы можете пересмотреть, Streamявляется ли структура подходящей для вашего варианта использования.

Тревор Фриман
источник
6
Формулировка Javadoc не исключает разделения на несколько потоков, если только один элемент потока входит в один из них
Thorbjørn Ravn Andersen
2
@ ThorbjørnRavnAndersen Я не уверен, что дублирование элемента потока является основным препятствием для разветвленного потока. Основная проблема заключается в том, что операция разветвления, по сути, является терминальной операцией, поэтому, когда вы решаете разветвляться, вы в основном создаете какую-то коллекцию. Например, я могу написать метод, List<Stream> forkStream(Stream s)но мои результирующие потоки будут по крайней мере частично поддерживаться коллекциями, а не непосредственно базовым потоком, в отличие от того, filterчто не является операцией терминального потока.
Тревор Фриман
7
Это одна из причин, по которой я считаю, что потоки Java немного недооценены по сравнению с github.com/ReactiveX/RxJava/wiki, потому что смысл потока заключается в применении операций к потенциально бесконечному набору элементов, а реальные операции часто требуют разделения , дублирование и объединение потоков.
Усман Исмаил
8

Это против общего механизма Stream. Скажем, вы можете разделить Stream S0 на Sa и Sb, как вы хотели. Выполнение любой терминальной операции, скажем count(), в Sa, обязательно «потребит» все элементы в S0. Поэтому Sb потерял свой источник данных.

Раньше у Stream был tee()метод, который, я думаю, дублирует поток до двух. Это удалено сейчас.

В Stream есть метод peek (), но вы можете использовать его для достижения своих требований.

Zhongyu
источник
1
peekэто именно то, что раньше tee.
Луи Вассерман
5

не совсем, но вы можете выполнить то, что вам нужно, вызвав Collectors.groupingBy(). вы создаете новую коллекцию, а затем можете создавать экземпляры потоков в этой новой коллекции.

aepurniet
источник
2

Это был наименее плохой ответ, который я мог придумать.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

Это берет поток целых чисел и разбивает их на 5. Для тех, кто больше 5, он фильтрует только четные числа и помещает их в список. В остальном он присоединяется к ним с |.

выходы:

 ([6, 8],0|1|2|3|4|5)

Он не идеален, поскольку собирает все в промежуточные коллекции, разрушая поток (и имеет слишком много аргументов!)

Ян Джонс
источник
1

Я наткнулся на этот вопрос, ища способ отфильтровать определенные элементы из потока и зарегистрировать их как ошибки. Поэтому мне не нужно было так сильно разбивать поток, как прикреплять преждевременное завершающее действие к предикату с ненавязчивым синтаксисом. Вот что я придумал:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
Себастьян Ганс
источник
0

Укороченная версия, которая использует Lombok

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}
OneCricketeer
источник
-3

Как насчет:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
Мэтью
источник
1
Так как поставщик вызывается дважды, вы получите две разные случайные коллекции. Я думаю, что OP
намерен