Java 8 Stream с пакетной обработкой

101

У меня есть большой файл со списком предметов.

Я хотел бы создать пакет элементов, сделать HTTP-запрос с этим пакетом (все элементы необходимы в качестве параметров в HTTP-запросе). Я могу сделать это очень легко с помощью forцикла, но, как любитель Java 8, я хочу попробовать написать это с помощью Java 8 Stream framework (и воспользоваться преимуществами ленивой обработки).

Пример:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Я хочу сделать что-то длинное lazyFileStream.group(500).map(processBatch).collect(toList())

Как лучше всего это сделать?

Энди Данг
источник
Я не могу понять, как выполнить группировку, извините, но строки Files # будут лениво читать содержимое файла.
Тоби,
1
так что вам в основном нужна инверсия flatMap(+ дополнительная flatMap, чтобы снова свернуть потоки)? Не думаю, что что-то подобное существует в стандартной библиотеке как удобный метод. Либо вам придется найти стороннюю библиотеку, либо написать свою собственную на основе сплитераторов и / или сборщика, испускающего поток потоков
the8472 04
3
Возможно, вы можете комбинировать Stream.generateс reader::readLineи limit, но проблема в том, что потоки не подходят для Исключений. Кроме того, это, вероятно, плохо распараллеливается. Я думаю, что forпетля по-прежнему лучший вариант.
tobias_k 04
Я только что добавил пример кода. Я не думаю, что flatMap - лучший вариант. Подозреваю, что мне, возможно, придется написать собственный Spliterator
Энди Данг
1
Для подобных вопросов я использую термин "злоупотребление потоком".
Кервин 07

Ответы:

13

Заметка! Это решение считывает весь файл перед запуском forEach.

Вы можете сделать это с помощью jOOλ , библиотеки, которая расширяет потоки Java 8 для случаев использования однопоточного последовательного потока:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

За кулисами zipWithIndex()просто:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... а groupBy()это удобство API для:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Отказ от ответственности: я работаю в компании, стоящей за jOOλ)

Лукас Эдер
источник
Вау. Это ИМЕННО то, что я ищу. Наша система обычно обрабатывает потоки данных последовательно, так что это было бы хорошо для перехода на Java 8.
Энди Данг
16
Обратите внимание, что это решение без надобности сохраняет весь входной поток в промежуточный Map(в отличие, например, от решения Бена Манеса)
Тагир Валеев
129

Для полноты, вот решение Guava .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

В вопросе коллекция доступна, поэтому поток не нужен, и его можно записать как

Iterables.partition(data, batchSize).forEach(this::process);
Бен Манес
источник
11
Lists.partitionэто еще один вариант, о котором я должен был упомянуть.
Бен Манес
2
это лениво, правда? он не будет вызывать все Streamв память перед обработкой соответствующего пакета
orirab
1
@orirab да. Между партиями он ленив, так как в нем будут потребляться batchSizeэлементы за итерацию.
Бен Манес
Не могли бы вы взглянуть на stackoverflow.com/questions/58666190/…
gstackoverflow
62

Возможна и чистая реализация Java-8:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Обратите внимание, что в отличие от JOOl, он может нормально работать параллельно (при условии, что у вас dataсписок произвольного доступа).

Тагир Валеев
источник
1
что, если ваши данные на самом деле являются потоком? (скажем, строки в файле или даже из сети).
Омри Ядан 01
7
@OmryYadan, вопрос был о том , вход от List(см data.size(), data.get()в этом вопросе). Отвечаю на заданный вопрос. Если у вас есть другой вопрос, задайте его вместо этого (хотя я думаю, что вопрос о потоке также уже задавался).
Тагир Валеев
1
Как обрабатывать партии параллельно?
soup_boy
38

Чистое решение Java 8 :

Чтобы сделать это элегантно, мы можем создать собственный сборщик, который принимает a batch sizeи a Consumerдля обработки каждого пакета:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

При желании затем создайте вспомогательный служебный класс:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Пример использования:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Я также разместил свой код на GitHub, если кто-то хочет взглянуть:

Ссылка на Github

рохитватс
источник
1
Это хорошее решение, если вы не можете поместить все элементы из вашего потока в память. Кроме того, он не будет работать с бесконечными потоками - метод сбора является терминальным, что означает, что вместо создания потока пакетов он будет ждать завершения потока, а затем обрабатывать результат пакетами.
Alex Ackerman
2
@AlexAckerman бесконечный поток будет означать, что финишер никогда не будет вызван, но аккумулятор все равно будет вызываться, поэтому элементы все равно будут обрабатываться. Кроме того, в любой момент времени в памяти должен находиться только размер пакета элементов.
Solubris
@Solubris, ты прав! Плохо, спасибо, что указали на это - я не буду удалять комментарий к ссылке, если кто-то имеет такое же представление о том, как работает метод сбора.
Alex Ackerman
Список, отправленный потребителю, должен быть скопирован, чтобы сделать его безопасным для модификации, например: batchProcessor.accept (copyOf (ts))
Solubris
19

Я написал собственный Spliterator для подобных сценариев. Он заполнит списки заданного размера из входного потока. Преимущество этого подхода в том, что он будет выполнять ленивую обработку и работать с другими функциями потока.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Брюс Гамильтон
источник
действительно полезно. Если кто-то хочет выполнить пакетную обработку по некоторым настраиваемым критериям (например, размер коллекции в байтах), вы можете делегировать свой настраиваемый предикат и использовать его в цикле for в качестве условия (тогда цикл imho while будет более читабельным)
пожалуйста,
Я не уверен, что реализация верна. Например, если базовый поток - SUBSIZEDэто возвращаемые разбиения, trySplitможет иметь больше элементов, чем до разделения (если разбиение происходит в середине пакета).
Солод
@Malt, если я Spliteratorsправильно понимаю , trySplitвсегда следует разделять данные на две примерно равные части, чтобы результат никогда не был больше оригинала?
Брюс Гамильтон
@BruceHamilton К сожалению, согласно документам, части не могут быть примерно равными. Они должны быть равны:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Солод
Да, это соответствует моему пониманию разделения Spliterator. Однако мне трудно понять, как «разбиения, возвращенные из trySplit, могут иметь больше элементов, чем до разделения», не могли бы вы пояснить, что вы здесь имеете в виду?
Брюс Гамильтон,
14

Нам нужно было решить похожую проблему. Мы хотели взять поток, размер которого превышает системную память (итерация по всем объектам в базе данных), и максимально рандомизировать порядок - мы подумали, что можно буферизовать 10 000 элементов и рандомизировать их.

Целью была функция, принимающая поток.

Среди предлагаемых здесь решений есть несколько вариантов:

  • Используйте различные дополнительные библиотеки, отличные от java 8
  • Начните с чего-то, что не является потоком - например, со списка произвольного доступа.
  • Иметь поток, который можно легко разделить в сплитераторе

Изначально нашим инстинктом было использовать собственный сборщик, но это означало, что он будет отключен от потоковой передачи. Приведенное выше решение для кастомного коллектора очень хорошее, и мы его почти использовали.

Вот решение, которое обманывает, используя тот факт, что Streams может дать вам, Iteratorкоторый вы можете использовать в качестве аварийного люка, чтобы позволить вам делать что-то еще, что потоки не поддерживают. IteratorПреобразуется обратно в поток с использованием другого немного Java 8 StreamSupportколдовства.

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Простой пример использования этого мог бы выглядеть так:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Приведенные выше отпечатки

[A, B, C]
[D, E, F]

В нашем случае мы хотели перетасовать пакеты, а затем сохранить их в виде потока - это выглядело так:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Это выводит что-то вроде (оно рандомизировано, поэтому каждый раз разное)

A
C
B
E
D
F

Секрет в том, что всегда есть поток, поэтому вы можете либо работать с потоком пакетов, либо делать что-то с каждым пакетом, а затем flatMapобратно в поток. Более того, все вышеперечисленное выполняется только как заключительное forEachили collectили другое завершающее выражение PULL данные через поток.

Оказывается, iteratorэто особый тип завершающей операции над потоком, и он не заставляет весь поток запускаться и поступать в память! Спасибо ребятам из Java 8 за блестящий дизайн!

Эшли Фриз
источник
И очень хорошо, что вы полностью перебираете каждый пакет, когда он собирается, и сохраняете его List- вы не можете откладывать итерацию внутрипакетных элементов, потому что потребитель может захотеть пропустить весь пакет, и если вы не использовали элементы, то они не пропустят очень далеко. (Я реализовал один из них на C #, хотя это было значительно проще.)
ErikE
9

Вы также можете использовать RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

или

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

или

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
ебать
источник
8

Вы также можете взглянуть на cyclops-react , я являюсь автором этой библиотеки. Он реализует интерфейс jOOλ (и, как расширение, потоки JDK 8), но, в отличие от параллельных потоков JDK 8, он ориентирован на асинхронные операции (например, потенциально блокирующие вызовы асинхронного ввода-вывода). Параллельные потоки JDK, напротив, сосредоточены на параллелизме данных для операций, связанных с процессором. Он работает, управляя совокупностями будущих задач под капотом, но предоставляет конечным пользователям стандартный расширенный Stream API.

Этот пример кода может помочь вам начать работу

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Здесь есть руководство по пакетной обработке

И более общий учебник здесь

Чтобы использовать собственный пул потоков (который, вероятно, более подходит для блокировки ввода-вывода), вы можете начать обработку с

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
Джон МакКлин
источник
3

Чистый пример Java 8, который также работает с параллельными потоками.

Как использовать:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

Объявление и реализация метода:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Николя Лакомб
источник
2

Честно говоря, взгляните на элегантное решение Vavr :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
источник
1

Простой пример использования Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Ответ Брюса более исчерпывающий, но я искал что-то быстрое и грязное для обработки кучи файлов.

ринмас
источник
1

это чистое java-решение, которое лениво оценивается.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Привет
источник
1

Вы можете использовать apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

Разделение выполняется не лениво, но после разделения списка вы получаете преимущества работы с потоками (например, использование параллельных потоков, добавление фильтров и т. Д.). В других ответах предлагались более сложные решения, но иногда удобочитаемость и ремонтопригодность более важны (а иногда и нет :-))

Таль Иоффе
источник
Не уверен, кто проголосовал против, но было бы неплохо понять, почему ... Я дал ответ, который дополняет другие ответы для людей, не умеющих использовать Guava
Tal Joffe
Здесь вы обрабатываете список, а не поток.
Drakemor
@Drakemor Я обрабатываю поток подсписок. обратите внимание на вызов функции stream ()
Tal Joffe
Но сначала вы превращаете его в список подсписок, который не будет работать правильно для настоящих потоковых данных. Вот ссылка на раздел: commons.apache.org/proper/commons-collections/apidocs/org/...
Drakemor
1
ТБХ Я не полностью понимаю ваш аргумент, но я думаю, мы можем согласиться и не согласиться. Я отредактировал свой ответ, чтобы отразить наш разговор здесь. Спасибо за обсуждение
Тал Иоффе
1

Это легко сделать с помощью Reactor :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
источник
0

С помощью Java 8и com.google.common.collect.Listsвы можете сделать что-то вроде:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Здесь Tтип элементов во входном списке и Uтип элементов в выходном списке.

И вы можете использовать это так:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
Josebui
источник