AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Когда я писал это, я предполагал, что потоки будут порождаться только вызовом карты, так как параллель помещается после карты. Но некоторые строки в файле получали разные номера записей для каждого выполнения.
Я прочитал официальную документацию по Java- потокам и несколько веб-сайтов, чтобы понять, как потоки работают под капотом.
Несколько вопросов:
Параллельный поток Java работает на основе SplitIterator , который реализуется каждой коллекцией, такой как ArrayList, LinkedList и т. Д. Когда мы создаем параллельный поток из этих коллекций, соответствующий итератор разделения будет использоваться для разделения и итерации коллекции. Это объясняет, почему параллелизм произошел на уровне исходного входного источника (строк файла), а не в результате карты (т. Е. Record pojo). Правильно ли мое понимание?
В моем случае вход является потоком ввода-вывода файла. Какой разделенный итератор будет использоваться?
Неважно, где мы находимся
parallel()
в трубопроводе. Исходный источник ввода всегда будет разделен, а остальные промежуточные операции будут применены.В этом случае Java не должна позволять пользователям размещать параллельные операции в любом месте конвейера, за исключением исходного источника. Потому что это дает неправильное понимание тем, кто не знает, как работает поток Java внутри. Я знаю, что
parallel()
операция была бы определена для типа объекта Stream, поэтому она работает таким образом. Но лучше предложить альтернативное решение.В приведенном выше фрагменте кода я пытаюсь добавить номер строки для каждой записи во входном файле, и поэтому он должен быть упорядочен. Тем не менее, я хочу применять
doSomeOperation()
параллельно, поскольку это тяжелый вес логики. Единственный способ добиться этого - написать собственный настраиваемый итератор разбиения. Есть ли другой путь?
источник
parallel()
это не что иное, как общий запрос модификатора, который применяется к базовому объекту потока. Помните, что существует только один поток-источник, если вы не применяете конечные операции к каналу, т. Е. До тех пор, пока ничего не «выполнено». Сказав это, вы в основном просто ставите под сомнение выбор дизайна Java. Который основан на мнении, и мы не можем помочь с этим.Stream
непосредственно в интерфейсе, и из-за хорошего каскадирования каждая операция возвращаетсяStream
снова. Представьте, что кто-то хочет дать вам,Stream
но уже применил пару подобных операцийmap
. Вы, как пользователь, все еще хотите иметь возможность решить, будет ли он выполняться параллельно или нет. Таким образом, у вас должна быть возможность звонитьparallel()
еще, хотя поток уже существует.flatMap
или выполняете небезопасные методы или подобные.Path
находится в локальной файловой системе и вы используете недавний JDK, сплитератор будет иметь лучшую возможность параллельной обработки, чем пакетное умножение на 1024. Но в некоторыхfindFirst
сценариях сбалансированное разбиение может быть даже контрпродуктивным …Ответы:
Весь поток является либо параллельным, либо последовательным. Мы не выбираем подмножество операций для запуска последовательно или параллельно.
Как вы упоминаете, параллельные потоки используют разделенные итераторы. Понятно, что это разделение данных перед началом работы.
Глядя на источник, я вижу, что он использует
java.nio.file.FileChannelLinesSpliterator
Правильно. Можно даже позвонить
parallel()
иsequential()
несколько раз. Тот, который был вызван последним, победит. Когда мы звонимparallel()
, мы устанавливаем это для возвращаемого потока; и, как указано выше, все операции выполняются либо последовательно, либо параллельно.Это становится вопросом мнений. Я думаю, что Забуза дает веские основания поддержать выбор дизайнеров JDK.
Это зависит от вашей деятельности
findFirst()
это ваша реальная терминальная операция, то вам даже не нужно беспокоиться о параллельном выполнении, потому что вdoSomething()
любом случае не будет много вызовов (findFirst()
это короткое замыкание)..parallel()
на самом деле может привести к обработке более одного элемента, в то время какfindFirst()
в последовательном потоке это будет предотвращено.Если ваша терминальная операция не создает много данных, то, возможно, вы можете создать свои
Record
объекты, используя последовательный поток, а затем обработать результат параллельно:Если ваш конвейер будет загружать много данных в память (что может быть причиной того, что вы используете
Files.lines()
), то, возможно, вам понадобится пользовательский итератор разбиения. Однако прежде чем перейти туда, я рассмотрю другие варианты (такие как сохранение строк со столбцом id для начала - это только мое мнение).Я также попытался бы обрабатывать записи небольшими партиями, например так:
Это выполняется
doSomeOperation()
параллельно без загрузки всех данных в память. Но обратите внимание, чтоbatchSize
нужно будет подумать.источник
Spliterator
реализация не была бы более сложной, чем эта, и позволяла бы более эффективную параллельную обработку ...parallelStream
операций имеет фиксированные накладные расходы для запуска операции и ожидания конечного результата, но при этом ограничивается параллелизмомbatchSize
. Во-первых, вам нужно кратное количество доступных в настоящее время ядер ЦП, чтобы избежать простоя потоков. Тогда число должно быть достаточно большим, чтобы компенсировать фиксированные издержки, но чем больше число, тем выше пауза, налагаемая операцией последовательного чтения, происходящей еще до того, как начнется параллельная обработка.Stream.generate
создает неупорядоченный поток, который не работает с предполагаемыми вариантами использования OP, такими какfindFirst()
. Напротив, один параллельный поток с разделителем, который возвращает чаны,trySplit
работает прямо и позволяет рабочим потокам обрабатывать следующий чанк, не ожидая завершения предыдущего.findFirst()
операция будет обрабатывать только небольшое количество элементов. Первое совпадение может все же произойти после обработки 90% всех элементов. Кроме того, при наличии десяти миллионов строк, даже нахождение соответствия после 10% все еще требует обработки миллиона строк.Первоначальный проект Stream включал идею поддержки последующих этапов конвейера с различными настройками параллельного выполнения, но эта идея была оставлена. API может возникать с этого времени, но, с другой стороны, дизайн API, который заставляет вызывающую программу принимать однозначное решение для параллельного или последовательного выполнения, будет намного сложнее.
Фактическое
Spliterator
использование вFiles.lines(…)
зависимости от реализации. В Java 8 (Oracle или OpenJDK) вы всегда получаете то же самое, что и сBufferedReader.lines()
. В более поздних версиях JDK, еслиPath
принадлежит файловой системе по умолчанию и кодировка является одной из поддерживаемых для этой функции, вы получаете поток с выделеннойSpliterator
реализациейjava.nio.file.FileChannelLinesSpliterator
. Если предварительные условия не выполнены, вы получаете то же самое, что и сBufferedReader.lines()
, который по-прежнему основан наIterator
реализованном внутриBufferedReader
и обернутом черезSpliterators.spliteratorUnknownSize
.Ваша конкретная задача лучше всего обрабатывается с помощью пользовательского интерфейса,
Spliterator
который может выполнять нумерацию строк прямо в источнике перед параллельной обработкой, чтобы разрешить последующую параллельную обработку без ограничений.источник
И следующее - простая демонстрация того, когда применяется параллельное приложение. Вывод peek ясно показывает разницу между двумя примерами. Примечание:
map
вызов просто добавлен, чтобы добавить другой метод доparallel
.источник