У меня есть большой файл со списком предметов.
Я хотел бы создать пакет элементов, сделать HTTP-запрос с этим пакетом (все элементы необходимы в качестве параметров в HTTP-запросе). Я могу сделать это очень легко с помощью for
цикла, но, как любитель Java 8, я хочу попробовать написать это с помощью Java 8 Stream framework (и воспользоваться преимуществами ленивой обработки).
Пример:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Я хочу сделать что-то длинное
lazyFileStream.group(500).map(processBatch).collect(toList())
Как лучше всего это сделать?
java
java-8
batch-processing
java-stream
Энди Данг
источник
источник
flatMap
(+ дополнительная flatMap, чтобы снова свернуть потоки)? Не думаю, что что-то подобное существует в стандартной библиотеке как удобный метод. Либо вам придется найти стороннюю библиотеку, либо написать свою собственную на основе сплитераторов и / или сборщика, испускающего поток потоковStream.generate
сreader::readLine
иlimit
, но проблема в том, что потоки не подходят для Исключений. Кроме того, это, вероятно, плохо распараллеливается. Я думаю, чтоfor
петля по-прежнему лучший вариант.Ответы:
Заметка! Это решение считывает весь файл перед запуском forEach.
Вы можете сделать это с помощью jOOλ , библиотеки, которая расширяет потоки Java 8 для случаев использования однопоточного последовательного потока:
Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); });
За кулисами
zipWithIndex()
просто:static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); }
... а
groupBy()
это удобство API для:default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); }
(Отказ от ответственности: я работаю в компании, стоящей за jOOλ)
источник
Map
(в отличие, например, от решения Бена Манеса)Для полноты, вот решение Guava .
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
В вопросе коллекция доступна, поэтому поток не нужен, и его можно записать как
Iterables.partition(data, batchSize).forEach(this::process);
источник
Lists.partition
это еще один вариант, о котором я должен был упомянуть.Stream
в память перед обработкой соответствующего пакетаbatchSize
элементы за итерацию.Возможна и чистая реализация Java-8:
int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch));
Обратите внимание, что в отличие от JOOl, он может нормально работать параллельно (при условии, что у вас
data
список произвольного доступа).источник
List
(смdata.size()
,data.get()
в этом вопросе). Отвечаю на заданный вопрос. Если у вас есть другой вопрос, задайте его вместо этого (хотя я думаю, что вопрос о потоке также уже задавался).Чистое решение Java 8 :
Чтобы сделать это элегантно, мы можем создать собственный сборщик, который принимает a
batch size
и aConsumer
для обработки каждого пакета:import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } }
При желании затем создайте вспомогательный служебный класс:
import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } }
Пример использования:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor));
Я также разместил свой код на GitHub, если кто-то хочет взглянуть:
Ссылка на Github
источник
Я написал собственный Spliterator для подобных сценариев. Он заполнит списки заданного размера из входного потока. Преимущество этого подхода в том, что он будет выполнять ленивую обработку и работать с другими функциями потока.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } }
источник
SUBSIZED
это возвращаемые разбиения,trySplit
может иметь больше элементов, чем до разделения (если разбиение происходит в середине пакета).Spliterators
правильно понимаю ,trySplit
всегда следует разделять данные на две примерно равные части, чтобы результат никогда не был больше оригинала?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Нам нужно было решить похожую проблему. Мы хотели взять поток, размер которого превышает системную память (итерация по всем объектам в базе данных), и максимально рандомизировать порядок - мы подумали, что можно буферизовать 10 000 элементов и рандомизировать их.
Целью была функция, принимающая поток.
Среди предлагаемых здесь решений есть несколько вариантов:
Изначально нашим инстинктом было использовать собственный сборщик, но это означало, что он будет отключен от потоковой передачи. Приведенное выше решение для кастомного коллектора очень хорошее, и мы его почти использовали.
Вот решение, которое обманывает, используя тот факт, что
Stream
s может дать вам,Iterator
который вы можете использовать в качестве аварийного люка, чтобы позволить вам делать что-то еще, что потоки не поддерживают.Iterator
Преобразуется обратно в поток с использованием другого немного Java 8StreamSupport
колдовства./** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } }
Простой пример использования этого мог бы выглядеть так:
@Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); }
Приведенные выше отпечатки
В нашем случае мы хотели перетасовать пакеты, а затем сохранить их в виде потока - это выглядело так:
@Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); }
Это выводит что-то вроде (оно рандомизировано, поэтому каждый раз разное)
Секрет в том, что всегда есть поток, поэтому вы можете либо работать с потоком пакетов, либо делать что-то с каждым пакетом, а затем
flatMap
обратно в поток. Более того, все вышеперечисленное выполняется только как заключительноеforEach
илиcollect
или другое завершающее выражение PULL данные через поток.Оказывается,
iterator
это особый тип завершающей операции над потоком, и он не заставляет весь поток запускаться и поступать в память! Спасибо ребятам из Java 8 за блестящий дизайн!источник
List
- вы не можете откладывать итерацию внутрипакетных элементов, потому что потребитель может захотеть пропустить весь пакет, и если вы не использовали элементы, то они не пропустят очень далеко. (Я реализовал один из них на C #, хотя это было значительно проще.)Вы также можете использовать RxJava :
или
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
или
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
источник
Вы также можете взглянуть на cyclops-react , я являюсь автором этой библиотеки. Он реализует интерфейс jOOλ (и, как расширение, потоки JDK 8), но, в отличие от параллельных потоков JDK 8, он ориентирован на асинхронные операции (например, потенциально блокирующие вызовы асинхронного ввода-вывода). Параллельные потоки JDK, напротив, сосредоточены на параллелизме данных для операций, связанных с процессором. Он работает, управляя совокупностями будущих задач под капотом, но предоставляет конечным пользователям стандартный расширенный Stream API.
Этот пример кода может помочь вам начать работу
LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
Здесь есть руководство по пакетной обработке
И более общий учебник здесь
Чтобы использовать собственный пул потоков (который, вероятно, более подходит для блокировки ввода-вывода), вы можете начать обработку с
LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
источник
Чистый пример Java 8, который также работает с параллельными потоками.
Как использовать:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
Объявление и реализация метода:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor) { List<ElementType> newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List<ElementType> fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }
источник
Честно говоря, взгляните на элегантное решение Vavr :
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
источник
Простой пример использования Spliterator
// read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } }
Ответ Брюса более исчерпывающий, но я искал что-то быстрое и грязное для обработки кучи файлов.
источник
это чистое java-решение, которое лениво оценивается.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){ List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable currentBatch.add(new ArrayList<T>(batchSize)); return Stream.concat(stream .sequential() .map(new Function<T, List<T>>(){ public List<T> apply(T t){ currentBatch.get(0).add(t); return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null; } }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0)) .limit(1) ).filter(Objects::nonNull); }
источник
Вы можете использовать apache.commons:
ListUtils.partition(ListOfLines, 500).stream() .map(partition -> processBatch(partition) .collect(Collectors.toList());
Разделение выполняется не лениво, но после разделения списка вы получаете преимущества работы с потоками (например, использование параллельных потоков, добавление фильтров и т. Д.). В других ответах предлагались более сложные решения, но иногда удобочитаемость и ремонтопригодность более важны (а иногда и нет :-))
источник
Это легко сделать с помощью Reactor :
источник
С помощью
Java 8
иcom.google.common.collect.Lists
вы можете сделать что-то вроде:public class BatchProcessingUtil { public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) { List<List<T>> batches = Lists.partition(data, batchSize); return batches.stream() .map(processFunction) // Send each batch to the process function .flatMap(Collection::stream) // flat results to gather them in 1 stream .collect(Collectors.toList()); } }
Здесь
T
тип элементов во входном списке иU
тип элементов в выходном списке.И вы можете использовать это так:
List<String> userKeys = [... list of user keys] List<Users> users = BatchProcessingUtil.process( userKeys, 10, // Batch Size partialKeys -> service.getUsers(partialKeys) );
источник