Как записи процесса Hadoop разделяются по границам блоков?

119

Согласно Hadoop - The Definitive Guide

Логические записи, которые определяет FileInputFormats, обычно не помещаются аккуратно в блоки HDFS. Например, логические записи TextInputFormat - это строки, которые чаще всего пересекают границы HDFS. Это не имеет никакого отношения к работе вашей программы - например, линии не пропущены и не разорваны - но об этом стоит знать, поскольку это означает, что локальные карты данных (то есть карты, работающие на том же хосте, что и их входные данные) выполнит некоторые удаленные чтения. Небольшие накладные расходы, которые это вызывает, обычно незначительны.

Предположим, что строка записи разделена на два блока (b1 и b2). Устройство отображения, обрабатывающее первый блок (b1), заметит, что последняя строка не имеет разделителя EOL, и извлечет оставшуюся часть строки из следующего блока данных (b2).

Как преобразователь, обрабатывающий второй блок (b2), определяет, что первая запись является неполной и должна обрабатывать, начиная со второй записи в блоке (b2)?

Правин Шрипати
источник

Ответы:

160

Интересный вопрос, я потратил некоторое время на изучение кода, чтобы узнать подробности, и вот мои мысли. InputFormat.getSplitsРазделения обрабатываются клиентом , поэтому просмотр FileInputFormat дает следующую информацию:

  • Для каждого входного файла получите длину файла, размер блока и вычислите размер разделения, как max(minSize, min(maxSize, blockSize))где maxSizeсоответствует mapred.max.split.sizeи minSizeесть mapred.min.split.size.
  • Разделите файл на разные FileSplits на основе рассчитанного выше размера разделения. Здесь важно то, что каждый из FileSplitних инициализируется startпараметром, соответствующим смещению во входном файле . В этот момент еще нет обработки строк. Соответствующая часть кода выглядит так:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

После этого, если вы посмотрите на, LineRecordReaderкоторый определяется TextInputFormat, там обрабатываются строки:

  • Когда вы инициализируете свой, LineRecordReaderон пытается создать экземпляр, LineReaderкоторый является абстракцией, чтобы иметь возможность читать строки FSDataInputStream. Есть 2 случая:
  • Если есть CompressionCodecопределенный, то этот кодек отвечает за обработку границ. Наверное, не имеет отношения к вашему вопросу.
  • Однако, если кодека нет, вот что интересно: если startваш InputSplitотличается от 0, то вы возвращаете 1 символ, а затем пропускаете первую встреченную строку, обозначенную \ n или \ r \ n (Windows) ! Обратный ход важен, потому что, если границы вашей строки совпадают с границами разделения, это гарантирует, что вы не пропустите действительную строку. Вот соответствующий код:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Таким образом, поскольку расщепления рассчитываются на клиенте, сопоставители не должны запускаться последовательно, каждый сопоставитель уже знает, нужно ли ему отбросить первую строку или нет.

Итак, в основном, если у вас есть 2 строки по 100 МБ в одном файле, и для упрощения скажем, что размер разделения составляет 64 МБ. Затем, когда рассчитываются входные разбиения, у нас будет следующий сценарий:

  • Разделение 1, содержащее путь и хосты к этому блоку. Инициализируется при запуске 200-200 = 0Мб, длина 64Мб.
  • Split 2 инициализируется при запуске 200-200 + 64 = 64Мб, длина 64Мб.
  • Split 3 инициализируется при запуске 200-200 + 128 = 128Мб, длина 64Мб.
  • Split 4 инициализирован при запуске 200-200 + 192 = 192 Мб, длина 8 Мб.
  • Mapper A будет обрабатывать разделение 1, start равно 0, поэтому не пропускайте первую строку и не считывайте всю строку, которая превышает лимит в 64 МБ, поэтому требуется удаленное чтение.
  • Mapper B обработает разделение 2, начало равно! = 0, поэтому пропустите первую строку после 64 МБ-1 байт, которая соответствует концу строки 1 на 100 МБ, которая все еще находится в разделе 2, у нас есть 28 МБ строки в разделе 2, поэтому удаленно прочитал оставшиеся 72Мб.
  • Mapper C будет обрабатывать разделение 3, начало равно! = 0, поэтому пропустите первую строку после 128 МБ-1 байт, которая соответствует концу строки 2 на 200 МБ, что является концом файла, поэтому ничего не делайте.
  • Mapper D такой же, как mapper C, за исключением того, что он ищет новую строку после 192Mb-1byte.
Чарльз Менгуи
источник
Также @PraveenSripati стоит упомянуть, что крайние случаи, когда граница будет в \ r при возврате \ r \ n, обрабатываются в LineReader.readLineфункции, я не думаю, что это имеет отношение к вашему вопросу, но при необходимости могу добавить больше деталей.
Чарльз Менгуи,
Предположим, что есть две строки с точным размером 64 МБ на входе, и поэтому InputSplits происходит точно на границах строки. Итак, всегда ли картограф будет игнорировать строку во втором блоке, потому что start! = 0.
Правин Срипати
6
@PraveenSripati В этом случае второй преобразователь увидит start! = 0, поэтому вернитесь назад на 1 символ, который вернет вас непосредственно перед \ n первой строки, а затем пропустите до следующего \ n. Таким образом, он пропустит первую строку, но обработает вторую строку, как ожидалось.
Charles Menguy
@CharlesMenguy, возможно ли, что первая строка файла каким-то образом пропущена? Конкретно у меня есть первая строка с ключом = 1 и значением a, затем есть еще две строки с тем же ключом где-то в файле, key = 1, val = b и key = 1, val = c. Дело в том, что мой редуктор получает {1, [b, c]} и {1, [a]} вместо {1, [a, b, c]}. Этого не произойдет, если я добавлю новую строку в начало своего файла. В чем может быть причина, сэр?
Кобе-Ван Кеноби
@CharlesMenguy Что делать, если файл в HDFS является двоичным файлом (в отличие от текстового файла, в котором происходит \r\n, \nусечение записи)?
CᴴᴀZ
17

Алгоритм Map Reduce не работает с физическими блоками файла. Он работает с логическим разделением входа. Разделение ввода зависит от того, где была записана запись. Запись может охватывать двух картографов.

В соответствии с настройкой HDFS очень большие файлы разбиваются на большие блоки (например, размером 128 МБ) и хранятся три копии этих блоков на разных узлах кластера.

HDFS не знает о содержимом этих файлов. Запись могла быть запущена в Блоке-a, но конец этой записи может присутствовать в Блоке-b .

Чтобы решить эту проблему, Hadoop использует логическое представление данных, хранящихся в файловых блоках, известное как разделение входных данных. Когда клиент задания MapReduce вычисляет входные разбиения , он определяет, где начинается первая целая запись в блоке и где заканчивается последняя запись в блоке .

Ключевой момент:

В случаях, когда последняя запись в блоке является неполной, разделение входных данных включает информацию о местоположении для следующего блока и байтовое смещение данных, необходимых для завершения записи.

Взгляните на диаграмму ниже.

введите описание изображения здесь

Взгляните на эту статью и связанный с ней вопрос SE: О разделении файлов Hadoop / HDFS

Подробнее можно прочитать в документации

Платформа Map-Reduce использует InputFormat задания, чтобы:

  1. Подтвердите входную спецификацию задания.
  2. Разделите входной файл (ы) на логические InputSplits, каждый из которых затем назначается отдельному Mapper.
  3. Затем каждый InputSplit назначается отдельному Mapper для обработки. Разделение может быть кортежем . InputSplit[] getSplits(JobConf job,int numSplits) - это API, который позаботится об этих вещах.

FileInputFormat , который проходит InputFormatреализованный getSplitsметод (). Взгляните на внутреннее устройство этого метода на grepcode

Равиндра бабу
источник
7

Я вижу это следующим образом: InputFormat отвечает за разбиение данных на логические части с учетом природы данных.
Ничто не мешает ему сделать это, хотя это может добавить значительную задержку к работе - вся логика и считывание границ желаемого размера разделения будет происходить в системе отслеживания заданий.
Самый простой формат ввода с учетом записей - TextInputFormat. Он работает следующим образом (насколько я понял из кода) - формат ввода создает разбиения по размеру, независимо от строк, но LineRecordReader всегда:
а) Пропускает первую строку в разбиении (или ее части), если это не так. первый разбиение
б) Прочитать одну строку после границы разбиения в конце (если данные доступны, значит, это не последний разбиение).

Давид Грузман
источник
Skip first line in the split (or part of it), if it is not the first split- если первая запись в не первом блоке завершена, то не уверен, как эта логика будет работать.
Правин Шрипати,
Насколько я вижу код - каждый сплит читает то, что у него + следующая строка. Так что если разрыв строки не на границе блока - это нормально. Как именно обрабатывается случай, когда разрыв строки находится точно на границе блока - нужно понимать - я еще немного прочитаю код
Дэвид Грузман
3

Насколько я понял, когда FileSplitинициализируется для первого блока, вызывается конструктор по умолчанию. Следовательно, значения для start и length изначально равны нулю. К концу обработки первого блока, если последняя строка не завершена, значение length будет больше, чем длина разбиения, и будет прочитана первая строка следующего блока. Из-за этого значение start для первого блока будет больше нуля, и при этом условии LineRecordReaderбудет пропущена первая строка второго блока. (См. Источник )

Если последняя строка первого блока завершена, то значение length будет равно длине первого блока, а значение start для второго блока будет равно нулю. В этом случае программа LineRecordReaderне пропустит первую строку и не прочитает второй блок с начала.

Имеет смысл?

aa8y
источник
2
В этом сценарии сопоставители должны связываться друг с другом и последовательно обрабатывать блоки, когда последняя строка в конкретном блоке не завершена. Не уверен, работает ли это так.
Правин Шрипати
1

Из исходного кода hadoop для LineRecordReader.java конструктор: я нашел несколько комментариев:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

исходя из этого, я считаю, что hadoop будет читать одну дополнительную строку для каждого разделения (в конце текущего разделения читать следующую строку в следующем разделении), и если не первый раздел, первая строка будет выброшена. так что ни одна строчная запись не будет потеряна и неполна

Shenghai.Geng
источник
0

Картографам не нужно общаться. Блоки файлов находятся в HDFS, и текущий модуль отображения (RecordReader) может прочитать блок, в котором есть оставшаяся часть строки. Это происходит за кадром.

user3507308
источник