Интересный вопрос, я потратил некоторое время на изучение кода, чтобы узнать подробности, и вот мои мысли. InputFormat.getSplits
Разделения обрабатываются клиентом , поэтому просмотр FileInputFormat дает следующую информацию:
- Для каждого входного файла получите длину файла, размер блока и вычислите размер разделения, как
max(minSize, min(maxSize, blockSize))
где maxSize
соответствует mapred.max.split.size
и minSize
есть mapred.min.split.size
.
Разделите файл на разные FileSplit
s на основе рассчитанного выше размера разделения. Здесь важно то, что каждый из FileSplit
них инициализируется start
параметром, соответствующим смещению во входном файле . В этот момент еще нет обработки строк. Соответствующая часть кода выглядит так:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
После этого, если вы посмотрите на, LineRecordReader
который определяется TextInputFormat
, там обрабатываются строки:
- Когда вы инициализируете свой,
LineRecordReader
он пытается создать экземпляр, LineReader
который является абстракцией, чтобы иметь возможность читать строки FSDataInputStream
. Есть 2 случая:
- Если есть
CompressionCodec
определенный, то этот кодек отвечает за обработку границ. Наверное, не имеет отношения к вашему вопросу.
Однако, если кодека нет, вот что интересно: если start
ваш InputSplit
отличается от 0, то вы возвращаете 1 символ, а затем пропускаете первую встреченную строку, обозначенную \ n или \ r \ n (Windows) ! Обратный ход важен, потому что, если границы вашей строки совпадают с границами разделения, это гарантирует, что вы не пропустите действительную строку. Вот соответствующий код:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Таким образом, поскольку расщепления рассчитываются на клиенте, сопоставители не должны запускаться последовательно, каждый сопоставитель уже знает, нужно ли ему отбросить первую строку или нет.
Итак, в основном, если у вас есть 2 строки по 100 МБ в одном файле, и для упрощения скажем, что размер разделения составляет 64 МБ. Затем, когда рассчитываются входные разбиения, у нас будет следующий сценарий:
- Разделение 1, содержащее путь и хосты к этому блоку. Инициализируется при запуске 200-200 = 0Мб, длина 64Мб.
- Split 2 инициализируется при запуске 200-200 + 64 = 64Мб, длина 64Мб.
- Split 3 инициализируется при запуске 200-200 + 128 = 128Мб, длина 64Мб.
- Split 4 инициализирован при запуске 200-200 + 192 = 192 Мб, длина 8 Мб.
- Mapper A будет обрабатывать разделение 1, start равно 0, поэтому не пропускайте первую строку и не считывайте всю строку, которая превышает лимит в 64 МБ, поэтому требуется удаленное чтение.
- Mapper B обработает разделение 2, начало равно! = 0, поэтому пропустите первую строку после 64 МБ-1 байт, которая соответствует концу строки 1 на 100 МБ, которая все еще находится в разделе 2, у нас есть 28 МБ строки в разделе 2, поэтому удаленно прочитал оставшиеся 72Мб.
- Mapper C будет обрабатывать разделение 3, начало равно! = 0, поэтому пропустите первую строку после 128 МБ-1 байт, которая соответствует концу строки 2 на 200 МБ, что является концом файла, поэтому ничего не делайте.
- Mapper D такой же, как mapper C, за исключением того, что он ищет новую строку после 192Mb-1byte.
LineReader.readLine
функции, я не думаю, что это имеет отношение к вашему вопросу, но при необходимости могу добавить больше деталей.\r\n, \n
усечение записи)?Алгоритм Map Reduce не работает с физическими блоками файла. Он работает с логическим разделением входа. Разделение ввода зависит от того, где была записана запись. Запись может охватывать двух картографов.
В соответствии с настройкой HDFS очень большие файлы разбиваются на большие блоки (например, размером 128 МБ) и хранятся три копии этих блоков на разных узлах кластера.
HDFS не знает о содержимом этих файлов. Запись могла быть запущена в Блоке-a, но конец этой записи может присутствовать в Блоке-b .
Чтобы решить эту проблему, Hadoop использует логическое представление данных, хранящихся в файловых блоках, известное как разделение входных данных. Когда клиент задания MapReduce вычисляет входные разбиения , он определяет, где начинается первая целая запись в блоке и где заканчивается последняя запись в блоке .
Ключевой момент:
В случаях, когда последняя запись в блоке является неполной, разделение входных данных включает информацию о местоположении для следующего блока и байтовое смещение данных, необходимых для завершения записи.
Взгляните на диаграмму ниже.
Взгляните на эту статью и связанный с ней вопрос SE: О разделении файлов Hadoop / HDFS
Подробнее можно прочитать в документации
Платформа Map-Reduce использует InputFormat задания, чтобы:
InputSplit[] getSplits(JobConf job,int numSplits
) - это API, который позаботится об этих вещах.FileInputFormat , который проходит
InputFormat
реализованныйgetSplits
метод (). Взгляните на внутреннее устройство этого метода на grepcodeисточник
Я вижу это следующим образом: InputFormat отвечает за разбиение данных на логические части с учетом природы данных.
Ничто не мешает ему сделать это, хотя это может добавить значительную задержку к работе - вся логика и считывание границ желаемого размера разделения будет происходить в системе отслеживания заданий.
Самый простой формат ввода с учетом записей - TextInputFormat. Он работает следующим образом (насколько я понял из кода) - формат ввода создает разбиения по размеру, независимо от строк, но LineRecordReader всегда:
а) Пропускает первую строку в разбиении (или ее части), если это не так. первый разбиение
б) Прочитать одну строку после границы разбиения в конце (если данные доступны, значит, это не последний разбиение).
источник
Skip first line in the split (or part of it), if it is not the first split
- если первая запись в не первом блоке завершена, то не уверен, как эта логика будет работать.Насколько я понял, когда
FileSplit
инициализируется для первого блока, вызывается конструктор по умолчанию. Следовательно, значения для start и length изначально равны нулю. К концу обработки первого блока, если последняя строка не завершена, значение length будет больше, чем длина разбиения, и будет прочитана первая строка следующего блока. Из-за этого значение start для первого блока будет больше нуля, и при этом условииLineRecordReader
будет пропущена первая строка второго блока. (См. Источник )Если последняя строка первого блока завершена, то значение length будет равно длине первого блока, а значение start для второго блока будет равно нулю. В этом случае программа
LineRecordReader
не пропустит первую строку и не прочитает второй блок с начала.Имеет смысл?
источник
Из исходного кода hadoop для LineRecordReader.java конструктор: я нашел несколько комментариев:
исходя из этого, я считаю, что hadoop будет читать одну дополнительную строку для каждого разделения (в конце текущего разделения читать следующую строку в следующем разделении), и если не первый раздел, первая строка будет выброшена. так что ни одна строчная запись не будет потеряна и неполна
источник
Картографам не нужно общаться. Блоки файлов находятся в HDFS, и текущий модуль отображения (RecordReader) может прочитать блок, в котором есть оставшаяся часть строки. Это происходит за кадром.
источник