Я хочу использовать Stream
для распараллеливания обработки разнородного набора JSON-файлов с удаленным хранением неизвестного числа (количество файлов не известно заранее). Размер файлов может варьироваться от 1 записи JSON на файл до 100 000 записей в некоторых других файлах. Запись JSON в этом случае означает автономный объект JSON, представленный одной строкой в файле.
Я действительно хочу использовать Streams для этого, и поэтому я реализовал это Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Проблема, с которой я столкнулся, заключается в том, что, хотя Stream сначала прекрасно распараллеливается, в конечном итоге самый большой файл остается обработанным в одном потоке. Я полагаю, что проксимальная причина хорошо задокументирована: сплитератор "несбалансирован".
Более конкретно, кажется, что trySplit
метод не вызывается после определенной точки Stream.forEach
жизненного цикла, поэтому дополнительная логика для распределения небольших пакетов в конце trySplit
редко выполняется.
Обратите внимание, что все сплитераторы, возвращаемые из trySplit, используют один и тот же paths
итератор. Я думал, что это действительно умный способ сбалансировать работу всех сплитераторов, но этого было недостаточно для достижения полного параллелизма.
Я хотел бы, чтобы параллельная обработка сначала выполнялась для файлов, а затем, когда несколько больших файлов все еще остаются сплитерирующими, я хочу распараллелить фрагменты оставшихся файлов. Это было намерение else
блока в конце trySplit
.
Есть ли простой / простой / канонический способ обойти эту проблему?
источник
Long.MAX_VALUE
вызывает чрезмерное и ненужное расщепление, в то время как любая другая оценка, кроме того,Long.MAX_VALUE
что останавливает дальнейшее расщепление, убивает параллелизм. Возвращение набора точных оценок, по-видимому, не приводит к какой-либо разумной оптимизации.AbstractSpliterator
но переопределяете,trySplit()
что является плохой комбинацией для чего-либо кромеLong.MAX_VALUE
, поскольку вы не адаптируете оценку размера вtrySplit()
. ПослеtrySplit()
этого оценка размера должна быть уменьшена на количество разделенных элементов.Ответы:
Вы
trySplit
должны вывести сплиты одинакового размера, независимо от размера базовых файлов. Вы должны рассматривать все файлы как один блок иArrayList
каждый раз заполнять сплитератор с обратной связью одинаковым количеством объектов JSON. Число объектов должно быть таким, чтобы обработка одного разбиения занимала от 1 до 10 миллисекунд: менее 1 мс, и вы начинаете приближаться к затратам на передачу пакета в рабочий поток, выше этого, и вы начинаете рисковать неравномерной загрузкой ЦП из-за задачи, которые слишком грубы.Сплитератор не обязан сообщать оценку размера, и вы уже делаете это правильно: ваша оценка
Long.MAX_VALUE
- это специальное значение, означающее «неограниченный». Однако, если у вас много файлов с одним объектом JSON, что приводит к пакетам размером 1, это ухудшит вашу производительность двумя способами: издержки открытия-чтения-закрытия файла могут стать узким местом и, если вам удастся сбежать что стоимость передачи потока может быть значительной по сравнению со стоимостью обработки одного элемента, что снова вызывает узкое место.Пять лет назад я решал похожую проблему, вы можете взглянуть на мое решение .
источник
Long.MAX_VALUE
правильно описывает неизвестный размер, но это не помогает, когда фактическая реализация Stream работает плохо. Даже использование результата вThreadLocalRandom.current().nextInt(100, 100_000)
качестве оценочного размера дает лучшие результаты.ArraySpliterator
который имеет приблизительный размер (даже точный размер). Таким образом, реализация Stream увидит размер массива vsLong.MAX_VALUE
, посчитает его несбалансированным и разделит «больший» сплитератор (игнорируя этоLong.MAX_VALUE
означает «неизвестный»), пока он не сможет разделиться дальше. Затем, если фрагментов недостаточно, он разделит разделители на основе массива, используя их известные размеры. Да, это работает очень хорошо, но не противоречит моему утверждению, что вам нужна оценка размера, независимо от того, насколько он беден.Long.MAX_VALUE
будет.После долгих экспериментов я все еще не смог получить никакого дополнительного параллелизма, играя с оценками размера. По сути, любое значение, отличное от,
Long.MAX_VALUE
может привести к преждевременному завершению работы сплитератора (и без какого-либо разделения), тогда как, с другой стороны,Long.MAX_VALUE
оценка будет вызыватьсяtrySplit
безостановочно, пока не вернетсяnull
.Решение, которое я нашел, состоит в том, чтобы внутренне распределить ресурсы между сплитераторами и позволить им восстановить баланс между собой.
Рабочий код:
источник