Параллельные бесконечные потоки Java исчерпывают память

16

Я пытаюсь понять, почему следующая Java-программа дает OutOfMemoryError, а соответствующая .parallel()- нет.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

У меня есть два вопроса:

  1. Каков предполагаемый результат этой программы?

    Без .parallel()этого кажется, что это просто вывод, sum(1+2+3+...)что означает, что он просто «застревает» в первом потоке в flatMap, что имеет смысл.

    С параллельным я не знаю, есть ли ожидаемое поведение, но я предполагаю, что оно каким-то образом чередовало первые nили около того потоков, где nчисло параллельных рабочих. Это также может немного отличаться в зависимости от поведения фрагментирования / буферизации.

  2. Что заставляет его исчерпать память? Я специально пытаюсь понять, как эти потоки реализованы под капотом.

    Я предполагаю, что что-то блокирует поток, поэтому он никогда не завершается и не может избавиться от сгенерированных значений, но я не совсем знаю, в каком порядке оцениваются вещи и где происходит буферизация.

Изменить: В случае, если это актуально, я использую Java 11.

Editt 2: Очевидно, то же самое происходит даже для простой программы IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), так что это может быть связано с ленивостью, limitа не flatMap.

Томас Ахле
источник
Параллельно () внутренне использует ForkJoinPool. Я думаю, что ForkJoin Framework находится в Java от Java 7
aravind

Ответы:

9

Вы говорите « но я не совсем знаю, в каком порядке оцениваются вещи и где происходит буферизация». », и это как раз то, чем являются параллельные потоки. Порядок оценки не уточняется.

Важным аспектом вашего примера является .limit(100_000_000). Это означает, что реализация не может просто суммировать произвольные значения, но должна суммировать первые 100 000 000 чисел. Обратите внимание, что в ссылочной реализации,.unordered().limit(100_000_000) не изменяется, что указывает на то, что для неупорядоченного случая нет специальной реализации, но это детали реализации.

Теперь, когда рабочие потоки обрабатывают элементы, они не могут просто суммировать их, поскольку они должны знать, какие элементы им разрешено использовать, что зависит от того, сколько элементов предшествует их конкретной рабочей нагрузке. Поскольку этот поток не знает размеров, это может быть известно только тогда, когда префиксные элементы были обработаны, что никогда не происходит для бесконечных потоков. Таким образом, рабочие потоки на данный момент продолжают буферизироваться, эта информация становится доступной.

В принципе, когда рабочий поток знает, что он обрабатывает самый левый рабочий блок, он может сразу суммировать элементы, подсчитывать их и сигнализировать об окончании при достижении предела. Таким образом, поток может завершиться, но это зависит от многих факторов.

В вашем случае правдоподобный сценарий состоит в том, что другие рабочие потоки распределяют буферы быстрее, чем считает самое левое задание. В этом сценарии незначительные изменения времени могут заставить поток иногда возвращаться со значением.

Когда мы замедляем все рабочие потоки, кроме одного, обрабатывающего самый левый блок, мы можем заставить поток завершиться (по крайней мере, в большинстве запусков):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

Following Я следую предложению Стюарта Маркса использовать порядок слева направо, когда речь идет о порядке встречи, а не о порядке обработки.

Holger
источник
Очень хороший ответ! Интересно, есть ли риск, что все потоки начнут выполнять операции flatMap, и ни один из них не будет выделен для фактического опустошения буферов (суммирования)? В моем реальном случае использования бесконечные потоки - это файлы, которые слишком велики для хранения в памяти. Интересно, как я могу переписать поток, чтобы уменьшить использование памяти?
Томас Але
1
Вы используете Files.lines(…)? Это было значительно улучшено в Java 9.
Хольгер
1
Это то, что он делает в Java 8. В более новых JRE он все равно будет использоваться BufferedReader.lines()при определенных обстоятельствах (кроме файловой системы по умолчанию, специальной кодировки или размера больше, чем Integer.MAX_FILES). Если применимо одно из них, может помочь индивидуальное решение. Это будет стоить нового Q & A ...
Хольгер
1
Integer.MAX_VALUE, конечно ...
Хольгер
1
Что такое внешний поток, поток файлов? Имеет ли он предсказуемый размер?
Хольгер
5

Мое лучшее предположение состоит в том, что добавление parallel()изменяет внутреннее поведение, проблемыflatMap() которого уже лениво оценивались ранее .

Полученная OutOfMemoryErrorошибка была сообщена в [JDK-8202307] Получение java.lang.OutOfMemoryError: пространства кучи Java при вызове Stream.iterator (). Next () в потоке, который использует бесконечный / очень большой поток в flatMap . Если вы посмотрите на тикет, это более или менее тот же самый след стека, который вы получаете. Билет был закрыт как не исправленный по следующей причине:

iterator()И spliterator()методы «аварийные люки» , которые будут использоваться , когда это не представляется возможным использовать другие операции. У них есть некоторые ограничения, потому что они превращают то, что является push-моделью реализации потока, в модель pull. Такой переход требует буферизации в определенных случаях, например, когда элемент (плоский) отображается на два или более элементов . Это значительно усложнило бы реализацию потока, вероятно за счет общих случаев, для поддержки понятия противодавления, чтобы сообщить, сколько элементов проходит через вложенные слои производства элементов.

Кароль Доубеки
источник
Это очень интересно! Имеет смысл, что переход push / pull требует буферизации, которая может использовать память. Однако в моем случае кажется, что использование только push должно работать нормально и просто отбрасывать оставшиеся элементы по мере их появления? Или, может быть, вы говорите, что flapmap вызывает создание итератора?
Томас Але
3

OOME вызвано не тем, что поток бесконечен, а тем, что это не так .

То есть, если вы закомментируете, у .limit(...)него никогда не будет недостатка памяти - но, конечно же, он никогда не закончится.

После разделения поток может отслеживать количество элементов, только если они накапливаются в каждом потоке (похоже, фактический аккумулятор Spliterators$ArraySpliterator#array).

Похоже, вы можете воспроизвести его без flatMap, просто запустите следующее с -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Тем не менее, после комментирования limit(), он должен работать нормально, пока вы не решите сэкономить свой ноутбук.

Помимо фактических деталей реализации, вот что, я думаю, происходит:

С limitпомощью sumредуктора требуется, чтобы первые X элементов суммировались, поэтому ни один поток не может выдавать частичные суммы. Каждый «срез» (нить) должен будет накапливать элементы и проходить через них. Без ограничения такого ограничения не существует, поэтому каждый «срез» будет просто вычислять частичную сумму из элементов, которые он получает (навсегда), предполагая, что он в конечном итоге выдаст результат.

Кости Сьюдату
источник
Что вы имеете в виду "как только он разделен"? Разве лимит разделяет это как-то?
Томас Але
@ThomasAhle parallel()будет использовать ForkJoinPoolвнутренне для достижения параллелизма. SpliteratorБудет использоваться для назначения работы для каждой ForkJoinзадачи, я предполагаю , что мы можем назвать единицу работы здесь , как «раскол».
Кароль Доубеки
Но почему это происходит только с лимитом?
Томас Але
@ThomasAhle Я отредактировал ответ двумя центами.
Кости Сьюдату
1
@ThomasAhle устанавливает точку останова Integer.sum(), используемую IntStream.sumредуктором. Вы увидите, что безлимитная версия вызывает эту функцию все время, в то время как ограниченная версия никогда не вызывает ее до OOM.
Кости Сьюдату