Как * на самом деле * читать данные CSV в TensorFlow?

84

Я относительно новичок в мире TensorFlow и довольно озадачен тем, как вы на самом деле считываете данные CSV в пригодные для использования тензоры примеров / меток в TensorFlow. Пример из учебника TensorFlow по чтению данных CSV довольно фрагментирован и дает вам лишь часть пути к обучению на данных CSV.

Вот мой код, который я собрал по кусочкам на основе этого руководства CSV:

from __future__ import print_function
import tensorflow as tf

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

filename = "csv_test_data.csv"

# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)

# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)

# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])

print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, col5])
    print(example, label)

  coord.request_stop()
  coord.join(threads)
  print("\ndone loading")

А вот краткий пример из загружаемого мной CSV-файла - довольно простые данные - 4 столбца функций и 1 столбец меток:

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

Весь приведенный выше код распечатывает каждый пример из файла CSV, один за другим , что, хотя и приятно, чертовски бесполезно для обучения.

Здесь я борюсь с тем, как на самом деле превратить эти отдельные примеры, загружаемые один за другим, в набор обучающих данных. Например, вот блокнот, над которым я работал в курсе Udacity Deep Learning. Я в основном хочу взять загружаемые данные CSV и поместить их во что-то вроде train_dataset и train_labels :

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

Я пробовал использовать tf.train.shuffle_batchвот так, но он по необъяснимым причинам зависает:

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, colRelevant])
    example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
    print(example, label)

Подводя итог, вот мои вопросы:

  • Что мне не хватает в этом процессе?
    • Похоже, мне не хватает некоторой ключевой интуиции о том, как правильно построить конвейер ввода.
  • Есть ли способ избежать необходимости знать длину файла CSV?
    • Кажется довольно неэлегантным знать количество строк, которые вы хотите обработать ( for i in range(file_length)строка кода выше)

Edit: Как только Ярослав указал, что я, вероятно, смешиваю здесь императивную и графическую части, это начало проясняться. Мне удалось собрать следующий код, который, как мне кажется, ближе к тому, что обычно делается при обучении модели из CSV (исключая любой код обучения модели):

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

def read_from_csv(filename_queue):
  reader = tf.TextLineReader(skip_header_lines=1)
  _, csv_row = reader.read(filename_queue)
  record_defaults = [[0],[0],[0],[0],[0]]
  colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
  features = tf.stack([colHour,colQuarter,colAction,colUser])  
  label = tf.stack([colLabel])  
  return features, label

def input_pipeline(batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)  
  example, label = read_from_csv(filename_queue)
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)

with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  try:
    while not coord.should_stop():
      example_batch, label_batch = sess.run([examples, labels])
      print(example_batch)
  except tf.errors.OutOfRangeError:
    print('Done training, epoch reached')
  finally:
    coord.request_stop()

  coord.join(threads) 
Роб
источник
Я пробовал ваш код, но не могу заставить его работать. Я что-то упустил, что вы определили? Благодарю. Я разместил здесь ветку, чтобы вы могли получить более подробную информацию: stackoverflow.com/questions/40143019/…
Ссылка

Ответы:

24

Я думаю, вы здесь смешиваете императивную и графовую части. Операция tf.train.shuffle_batchсоздает новый узел очереди, и один узел может использоваться для обработки всего набора данных. Поэтому я думаю, что вы зависаете, потому что вы создали кучу shuffle_batchочередей в цикле for и не запустили для них обработчики очереди.

Обычное использование входного конвейера выглядит так:

  1. Добавить узлы как shuffle_batchво входной конвейер
  2. (необязательно, чтобы предотвратить непреднамеренное изменение графика) завершить график

--- конец построения графа, начало императивного программирования -

  1. tf.start_queue_runners
  2. while(True): session.run()

Чтобы быть более масштабируемым (чтобы избежать Python GIL), вы можете генерировать все свои данные с помощью конвейера TensorFlow. Однако, если производительность не критична, вы можете подключить массив numpy к входному конвейеру, используя slice_input_producer.Вот пример с некоторыми Printузлами, чтобы увидеть, что происходит (сообщения Printпереходят на стандартный вывод при запуске узла)

tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data

(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()

try:
  while True:
    print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
  print "No more inputs."

Вы должны увидеть что-то вроде этого

[[0 1]
 [2 3]
 [4 5]
 [6 7]
 [8 9]]
[[0 1]
 [2 3]]
[[4 5]
 [6 7]]
No more inputs.

Цифры «8, 9» не пополнили всю партию, поэтому они не были произведены. Также tf.Printпечатаются в sys.stdout, поэтому они отображаются отдельно в Терминале для меня.

PS: минимум подключения batchк инициализированной вручную очереди находится в выпуске github 2193

Кроме того, для целей отладки вы можете настроить timeoutсеанс, чтобы ваш блокнот IPython не зависал при удалении из очереди пустой очереди. Я использую эту вспомогательную функцию для своих сеансов

def create_session():
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=60000   # terminate on long hangs
  # create interactive session to register a default session
  sess = tf.InteractiveSession("", config=config)
  return sess

Примечания по масштабируемости:

  1. tf.constantвстроить копию ваших данных в график. Существует фундаментальный предел размера определения графика в 2 ГБ, так что это верхний предел размера данных.
  2. Вы можете обойти этот предел, используя v=tf.Variableи сохраняя данные там, запустив v.assign_opс tf.placeholderправой стороны и передав массив numpy в placeholder ( feed_dict)
  3. Это по-прежнему создает две копии данных, поэтому для экономии памяти вы можете создать свою собственную версию, slice_input_producerкоторая работает с массивами numpy и загружает строки по одной, используяfeed_dict
Ярослав Булатов
источник
2
Ах да! Вы совершенно правы - как только вы сказали: «Я думаю, вы смешиваете здесь императивную и графовую части», я начал понимать, в чем я ошибся. Я опубликовал редактирование своего вопроса, которое включает последний собранный мной код, который на самом деле приближает меня к тому, что я хочу - я могу успешно читать данные CSV и группировать их таким образом, чтобы я мог обучать модель.
Роб
2
Я предлагаю обновить этот ответ, чтобы он работал с последними версиями TensorFlow: заменить tf.slice_input_producer()на tf.train.slice_input_producer()(и аналогично для некоторых других функций). А также добавить sess.run(tf.initialize_local_variables())после sess.run(tf.initialize_all_variables()).
MiniQuark
Еще несколько изменений, которые необходимо внести: pack()есть сейчас stack(), и его initialize_all_variables()следует заменить на global_variables_initializer()и local_variables_initializer().
MiniQuark 01
С tenorflow 1.0.1 вам нужно инициализировать локальные и глобальные переменные как tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()).run(). Вам нужно будет инициализировать локальные переменные, так как вы используете num_epochs и в соответствии с документацией «Примечание: если num_epochsнет None, эта функция создает локальный счетчик epochs».
Бруно Р. Кардосо
13

Или вы можете попробовать это, код загружает набор данных Iris в тензорный поток, используя pandas и numpy, и в сеансе печатается простой вывод одного нейрона. Надеюсь, это поможет для базового понимания .... [я не добавил способ горячих меток декодирования].

import tensorflow as tf 
import numpy
import pandas as pd
df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,'str')
#print data, labels

#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))

with tf.Session() as sess:
    print sess.run(y)
Нагарджун Гурурадж
источник
Это было очень поучительно, но, если я правильно понимаю, это не показывает, как использовать данные для обучения ...
diverbyzero
да, я скоро добавлю их ... Это должно быть тривиально, не так ли ... подсчитайте потери, запустите оптимизатор, я скоро добавлю их
Нагарджун Гурурадж
2
Привет diverbyzero, извини, я опоздал! Я нашел еще одну интересную ссылку, которая действительно облегчает проблему tensorflow.org/tutorials/tflearn .... Здесь вы можете загрузить файлы csv, обучить их, выполнить классификацию ...
Нагарджун Гурурадж
@NagarjunGururaj Могу ли я использовать набор данных, созданный contrib_learn, в обычной подпрограмме тензорного потока?
Джей Вонг
какой набор данных? Вы имеете в виду Ирис или кого-нибудь еще?
Нагарджун Гурурадж
2

Вы можете использовать последнюю версию tf.data API:

dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
   sess.run([iteator.initializer])
Адарш Кумар
источник
2

Если кто-то пришел сюда в поисках простого способа чтения абсолютно больших и сегментированных файлов CSV в tf.estimator API, пожалуйста, посмотрите мой код ниже

CSV_COLUMNS = ['ID','text','class']
LABEL_COLUMN = 'class'
DEFAULTS = [['x'],['no'],[0]]  #Default values

def read_dataset(filename, mode, batch_size = 512):
    def _input_fn(v_test=False):
#         def decode_csv(value_column):
#             columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
#             features = dict(zip(CSV_COLUMNS, columns))
#             label = features.pop(LABEL_COLUMN)
#             return add_engineered(features), label

        # Create list of files that match pattern
        file_list = tf.gfile.Glob(filename)

        # Create dataset from file list
        #dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
        dataset = tf.contrib.data.make_csv_dataset(file_list,
                                                   batch_size=batch_size,
                                                   column_names=CSV_COLUMNS,
                                                   column_defaults=DEFAULTS,
                                                   label_name=LABEL_COLUMN)

        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None # indefinitely
            dataset = dataset.shuffle(buffer_size = 10 * batch_size)
        else:
            num_epochs = 1 # end-of-input after this

        batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()

        #Begins - Uncomment for testing only -----------------------------------------------------<
        if v_test == True:
            with tf.Session() as sess:
                print(sess.run(batch_features))
        #End - Uncomment for testing only -----------------------------------------------------<
        return add_engineered(batch_features), batch_labels
    return _input_fn

Пример использования в TF.estimator:

train_spec = tf.estimator.TrainSpec(input_fn = read_dataset(
                                                filename = train_file,
                                                mode = tf.estimator.ModeKeys.TRAIN,
                                                batch_size = 128), 
                                      max_steps = num_train_steps)
Хасан Рафик
источник
0

2.0 Совместимое решение : этот ответ может быть предоставлен другими участниками вышеупомянутой ветки, но я предоставлю дополнительные ссылки, которые помогут сообществу.

dataset = tf.data.experimental.make_csv_dataset(
      file_path,
      batch_size=5, # Artificially small to make examples easier to show.
      label_name=LABEL_COLUMN,
      na_value="?",
      num_epochs=1,
      ignore_errors=True, 
      **kwargs)

Для получения дополнительной информации обратитесь к этому руководству по Tensorflow .

Поддержка Tensorflow
источник
1
Я нахожу этот ответ (а также руководство и документацию) крайне разочаровывающим. По словам OP, он по-прежнему является «частью пути к обучению на данных CSV». Он создает «набор данных» (но какой тип - это даже tf.data.Dataset? В документации неясно), и кажется, что набор данных ориентирован на столбцы, а не на строки. Большинству моделей нужны пакеты строк, передаваемые им для обучения - как достичь этого шага? Я задал этот вопрос в поисках полного примера.
omatai
Пожалуйста, предоставьте полный пример make_csv_dataset вместо того, чтобы просто помещать документацию абстрактного уровня!
DevLoverUmar