Я пытаюсь понять, почему следующая Java-программа дает OutOfMemoryError
, а соответствующая .parallel()
- нет.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
У меня есть два вопроса:
Каков предполагаемый результат этой программы?
Без
.parallel()
этого кажется, что это просто вывод,sum(1+2+3+...)
что означает, что он просто «застревает» в первом потоке в flatMap, что имеет смысл.С параллельным я не знаю, есть ли ожидаемое поведение, но я предполагаю, что оно каким-то образом чередовало первые
n
или около того потоков, гдеn
число параллельных рабочих. Это также может немного отличаться в зависимости от поведения фрагментирования / буферизации.Что заставляет его исчерпать память? Я специально пытаюсь понять, как эти потоки реализованы под капотом.
Я предполагаю, что что-то блокирует поток, поэтому он никогда не завершается и не может избавиться от сгенерированных значений, но я не совсем знаю, в каком порядке оцениваются вещи и где происходит буферизация.
Изменить: В случае, если это актуально, я использую Java 11.
Editt 2: Очевидно, то же самое происходит даже для простой программы IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, так что это может быть связано с ленивостью, limit
а не flatMap
.
источник
Ответы:
Вы говорите « но я не совсем знаю, в каком порядке оцениваются вещи и где происходит буферизация». », и это как раз то, чем являются параллельные потоки. Порядок оценки не уточняется.
Важным аспектом вашего примера является
.limit(100_000_000)
. Это означает, что реализация не может просто суммировать произвольные значения, но должна суммировать первые 100 000 000 чисел. Обратите внимание, что в ссылочной реализации,.unordered().limit(100_000_000)
не изменяется, что указывает на то, что для неупорядоченного случая нет специальной реализации, но это детали реализации.Теперь, когда рабочие потоки обрабатывают элементы, они не могут просто суммировать их, поскольку они должны знать, какие элементы им разрешено использовать, что зависит от того, сколько элементов предшествует их конкретной рабочей нагрузке. Поскольку этот поток не знает размеров, это может быть известно только тогда, когда префиксные элементы были обработаны, что никогда не происходит для бесконечных потоков. Таким образом, рабочие потоки на данный момент продолжают буферизироваться, эта информация становится доступной.
В принципе, когда рабочий поток знает, что он обрабатывает самый левый рабочий блок, он может сразу суммировать элементы, подсчитывать их и сигнализировать об окончании при достижении предела. Таким образом, поток может завершиться, но это зависит от многих факторов.
В вашем случае правдоподобный сценарий состоит в том, что другие рабочие потоки распределяют буферы быстрее, чем считает самое левое задание. В этом сценарии незначительные изменения времени могут заставить поток иногда возвращаться со значением.
Когда мы замедляем все рабочие потоки, кроме одного, обрабатывающего самый левый блок, мы можем заставить поток завершиться (по крайней мере, в большинстве запусков):
Following Я следую предложению Стюарта Маркса использовать порядок слева направо, когда речь идет о порядке встречи, а не о порядке обработки.
источник
Files.lines(…)
? Это было значительно улучшено в Java 9.BufferedReader.lines()
при определенных обстоятельствах (кроме файловой системы по умолчанию, специальной кодировки или размера больше, чемInteger.MAX_FILES
). Если применимо одно из них, может помочь индивидуальное решение. Это будет стоить нового Q & A ...Integer.MAX_VALUE
, конечно ...Мое лучшее предположение состоит в том, что добавление
parallel()
изменяет внутреннее поведение, проблемыflatMap()
которого уже лениво оценивались ранее .Полученная
OutOfMemoryError
ошибка была сообщена в [JDK-8202307] Получение java.lang.OutOfMemoryError: пространства кучи Java при вызове Stream.iterator (). Next () в потоке, который использует бесконечный / очень большой поток в flatMap . Если вы посмотрите на тикет, это более или менее тот же самый след стека, который вы получаете. Билет был закрыт как не исправленный по следующей причине:источник
OOME вызвано не тем, что поток бесконечен, а тем, что это не так .
То есть, если вы закомментируете, у
.limit(...)
него никогда не будет недостатка памяти - но, конечно же, он никогда не закончится.После разделения поток может отслеживать количество элементов, только если они накапливаются в каждом потоке (похоже, фактический аккумулятор
Spliterators$ArraySpliterator#array
).Похоже, вы можете воспроизвести его без
flatMap
, просто запустите следующее с-Xmx128m
:Тем не менее, после комментирования
limit()
, он должен работать нормально, пока вы не решите сэкономить свой ноутбук.Помимо фактических деталей реализации, вот что, я думаю, происходит:
С
limit
помощьюsum
редуктора требуется, чтобы первые X элементов суммировались, поэтому ни один поток не может выдавать частичные суммы. Каждый «срез» (нить) должен будет накапливать элементы и проходить через них. Без ограничения такого ограничения не существует, поэтому каждый «срез» будет просто вычислять частичную сумму из элементов, которые он получает (навсегда), предполагая, что он в конечном итоге выдаст результат.источник
parallel()
будет использоватьForkJoinPool
внутренне для достижения параллелизма.Spliterator
Будет использоваться для назначения работы для каждойForkJoin
задачи, я предполагаю , что мы можем назвать единицу работы здесь , как «раскол».Integer.sum()
, используемуюIntStream.sum
редуктором. Вы увидите, что безлимитная версия вызывает эту функцию все время, в то время как ограниченная версия никогда не вызывает ее до OOM.