Параллельный поток Java - порядок вызова метода parallel () [closed]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Когда я писал это, я предполагал, что потоки будут порождаться только вызовом карты, так как параллель помещается после карты. Но некоторые строки в файле получали разные номера записей для каждого выполнения.

Я прочитал официальную документацию по Java- потокам и несколько веб-сайтов, чтобы понять, как потоки работают под капотом.

Несколько вопросов:

  • Параллельный поток Java работает на основе SplitIterator , который реализуется каждой коллекцией, такой как ArrayList, LinkedList и т. Д. Когда мы создаем параллельный поток из этих коллекций, соответствующий итератор разделения будет использоваться для разделения и итерации коллекции. Это объясняет, почему параллелизм произошел на уровне исходного входного источника (строк файла), а не в результате карты (т. Е. Record pojo). Правильно ли мое понимание?

  • В моем случае вход является потоком ввода-вывода файла. Какой разделенный итератор будет использоваться?

  • Неважно, где мы находимся parallel()в трубопроводе. Исходный источник ввода всегда будет разделен, а остальные промежуточные операции будут применены.

    В этом случае Java не должна позволять пользователям размещать параллельные операции в любом месте конвейера, за исключением исходного источника. Потому что это дает неправильное понимание тем, кто не знает, как работает поток Java внутри. Я знаю, что parallel()операция была бы определена для типа объекта Stream, поэтому она работает таким образом. Но лучше предложить альтернативное решение.

  • В приведенном выше фрагменте кода я пытаюсь добавить номер строки для каждой записи во входном файле, и поэтому он должен быть упорядочен. Тем не менее, я хочу применять doSomeOperation()параллельно, поскольку это тяжелый вес логики. Единственный способ добиться этого - написать собственный настраиваемый итератор разбиения. Есть ли другой путь?

исследователь
источник
2
Это больше связано с тем, как создатели Java решили разработать интерфейс. Вы размещаете свои запросы в конвейере, и все, что не является окончательной операцией, будет собрано первым. parallel()это не что иное, как общий запрос модификатора, который применяется к базовому объекту потока. Помните, что существует только один поток-источник, если вы не применяете конечные операции к каналу, т. Е. До тех пор, пока ничего не «выполнено». Сказав это, вы в основном просто ставите под сомнение выбор дизайна Java. Который основан на мнении, и мы не можем помочь с этим.
Забузард
1
Я полностью понимаю вашу точку зрения и путаницу, но я не думаю, что есть намного лучшие решения. Этот метод предлагается Streamнепосредственно в интерфейсе, и из-за хорошего каскадирования каждая операция возвращается Streamснова. Представьте, что кто-то хочет дать вам, Streamно уже применил пару подобных операций map. Вы, как пользователь, все еще хотите иметь возможность решить, будет ли он выполняться параллельно или нет. Таким образом, у вас должна быть возможность звонить parallel()еще, хотя поток уже существует.
Забузард
1
Кроме того, я бы предпочел спросить, почему вы захотите выполнить часть потока последовательно, а затем переключиться на параллельный. Если поток уже достаточно велик, чтобы претендовать на параллельное выполнение, то это, вероятно, также относится ко всему, что было до этого в конвейере. Так почему бы не использовать параллельное выполнение и для этой части? Я понимаю, что есть крайние случаи, например, если вы резко увеличиваете размер с помощью flatMapили выполняете небезопасные методы или подобные.
Забузард
1
@ Zabuza Я не подвергаю сомнению выбор дизайна Java, но я просто выражаю свое беспокойство. Любой основной пользователь потока Java может получить ту же путаницу, если он не понимает работу потока. Я полностью согласен с вашим вторым комментарием. Я только что выделил одно возможное решение, которое может иметь свои недостатки, как вы упомянули. Но мы можем видеть, может ли это быть решено любым другим способом. Что касается вашего третьего комментария, я уже упоминал свой вариант использования в последнем пункте моего описания
исследователь
1
@ Евгений, когда он Pathнаходится в локальной файловой системе и вы используете недавний JDK, сплитератор будет иметь лучшую возможность параллельной обработки, чем пакетное умножение на 1024. Но в некоторых findFirstсценариях сбалансированное разбиение может быть даже контрпродуктивным …
Хольгер,

Ответы:

8

Это объясняет, почему параллелизм произошел на уровне исходного входного источника (строк файла), а не в результате карты (т. Е. Record pojo).

Весь поток является либо параллельным, либо последовательным. Мы не выбираем подмножество операций для запуска последовательно или параллельно.

Когда операция терминала инициируется, конвейер потока выполняется последовательно или параллельно в зависимости от ориентации потока, в котором он вызывается. [...] Когда операция терминала инициируется, конвейер потока выполняется последовательно или параллельно в зависимости от режима потока, в котором он вызывается. тот же источник

Как вы упоминаете, параллельные потоки используют разделенные итераторы. Понятно, что это разделение данных перед началом работы.


В моем случае вход является потоком ввода-вывода файла. Какой разделенный итератор будет использоваться?

Глядя на источник, я вижу, что он использует java.nio.file.FileChannelLinesSpliterator


Неважно, где мы размещаем параллель () в конвейере. Исходный источник ввода всегда будет разделен, а остальные промежуточные операции будут применены.

Правильно. Можно даже позвонить parallel()и sequential()несколько раз. Тот, который был вызван последним, победит. Когда мы звонимparallel() , мы устанавливаем это для возвращаемого потока; и, как указано выше, все операции выполняются либо последовательно, либо параллельно.


В этом случае Java не должна позволять пользователям размещать параллельные операции в любом месте конвейера, за исключением исходного источника ...

Это становится вопросом мнений. Я думаю, что Забуза дает веские основания поддержать выбор дизайнеров JDK.


Единственный способ добиться этого - написать собственный настраиваемый итератор разбиения. Есть ли другой путь?

Это зависит от вашей деятельности

  • Если findFirst()это ваша реальная терминальная операция, то вам даже не нужно беспокоиться о параллельном выполнении, потому что в doSomething()любом случае не будет много вызовов ( findFirst()это короткое замыкание). .parallel()на самом деле может привести к обработке более одного элемента, в то время какfindFirst() в последовательном потоке это будет предотвращено.
  • Если ваша терминальная операция не создает много данных, то, возможно, вы можете создать свои Recordобъекты, используя последовательный поток, а затем обработать результат параллельно:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
  • Если ваш конвейер будет загружать много данных в память (что может быть причиной того, что вы используете Files.lines()), то, возможно, вам понадобится пользовательский итератор разбиения. Однако прежде чем перейти туда, я рассмотрю другие варианты (такие как сохранение строк со столбцом id для начала - это только мое мнение).
    Я также попытался бы обрабатывать записи небольшими партиями, например так:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }

    Это выполняется doSomeOperation()параллельно без загрузки всех данных в память. Но обратите внимание, что batchSizeнужно будет подумать.

ernest_k
источник
1
Благодарю за разъяснение. Полезно знать о третьем предложенном вами решении. Я посмотрю, так как я не использовал takeWhile и Supplier.
исследователь
2
Пользовательская Spliteratorреализация не была бы более сложной, чем эта, и позволяла бы более эффективную параллельную обработку ...
Хольгер,
1
Каждая из ваших внутренних parallelStreamопераций имеет фиксированные накладные расходы для запуска операции и ожидания конечного результата, но при этом ограничивается параллелизмом batchSize. Во-первых, вам нужно кратное количество доступных в настоящее время ядер ЦП, чтобы избежать простоя потоков. Тогда число должно быть достаточно большим, чтобы компенсировать фиксированные издержки, но чем больше число, тем выше пауза, налагаемая операцией последовательного чтения, происходящей еще до того, как начнется параллельная обработка.
Хольгер
1
Параллельное вращение внешнего потока может вызвать плохие помехи внутреннему в текущей реализации, кроме точки, которая Stream.generateсоздает неупорядоченный поток, который не работает с предполагаемыми вариантами использования OP, такими как findFirst(). Напротив, один параллельный поток с разделителем, который возвращает чаны, trySplitработает прямо и позволяет рабочим потокам обрабатывать следующий чанк, не ожидая завершения предыдущего.
Хольгер
2
Нет оснований предполагать, что findFirst()операция будет обрабатывать только небольшое количество элементов. Первое совпадение может все же произойти после обработки 90% всех элементов. Кроме того, при наличии десяти миллионов строк, даже нахождение соответствия после 10% все еще требует обработки миллиона строк.
Хольгер
7

Первоначальный проект Stream включал идею поддержки последующих этапов конвейера с различными настройками параллельного выполнения, но эта идея была оставлена. API может возникать с этого времени, но, с другой стороны, дизайн API, который заставляет вызывающую программу принимать однозначное решение для параллельного или последовательного выполнения, будет намного сложнее.

Фактическое Spliteratorиспользование в Files.lines(…)зависимости от реализации. В Java 8 (Oracle или OpenJDK) вы всегда получаете то же самое, что и с BufferedReader.lines(). В более поздних версиях JDK, если Pathпринадлежит файловой системе по умолчанию и кодировка является одной из поддерживаемых для этой функции, вы получаете поток с выделенной Spliteratorреализацией java.nio.file.FileChannelLinesSpliterator. Если предварительные условия не выполнены, вы получаете то же самое, что и с BufferedReader.lines(), который по-прежнему основан на Iteratorреализованном внутри BufferedReaderи обернутом через Spliterators.spliteratorUnknownSize.

Ваша конкретная задача лучше всего обрабатывается с помощью пользовательского интерфейса, Spliteratorкоторый может выполнять нумерацию строк прямо в источнике перед параллельной обработкой, чтобы разрешить последующую параллельную обработку без ограничений.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
источник
0

И следующее - простая демонстрация того, когда применяется параллельное приложение. Вывод peek ясно показывает разницу между двумя примерами. Примечание: mapвызов просто добавлен, чтобы добавить другой метод до parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
источник