Индикатор прогресса во время операций с пандами

159

Я регулярно выполняю операции pandas над фреймами данных, длина которых превышает 15 миллионов строк, и я хотел бы получить доступ к индикатору прогресса для определенных операций.

Существует ли текстовый индикатор прогресса для операций pandas split-apply-Объединить?

Например, что-то вроде:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

где feature_rollup- несколько задействованная функция, которая принимает множество столбцов DF и создает новые пользовательские столбцы с помощью различных методов. Эти операции могут занять некоторое время для больших фреймов данных, поэтому я хотел бы знать, возможно ли иметь текстовый вывод в записной книжке iPython, который информирует меня о прогрессе.

До сих пор я пробовал индикаторы прогресса канонического цикла для Python, но они не взаимодействуют с пандами каким-либо значимым образом.

Я надеюсь, что есть кое-что, что я упустил из библиотеки / документации панд, что позволяет узнать о прогрессе разделения-применения-объединения. Простая реализация, возможно, будет смотреть на общее количество подмножеств фреймов данных, над которыми applyработает функция, и сообщать о прогрессе как завершенную часть этих подмножеств.

Возможно, это то, что нужно добавить в библиотеку?

cwharland
источник
Вы сделали% prun (профиль) в коде? иногда вы можете выполнять операции над всем кадром, прежде чем подавать заявку на устранение узких мест
Джефф
@Jeff: Вы держите пари, я сделал это раньше, чтобы выжать из нее все до последней части. Проблема действительно сводится к границе псевдо-сокращения карт, над которой я работаю, поскольку количество строк составляет десятки миллионов, поэтому я не ожидаю увеличения суперскорости, просто хочу получить некоторую обратную связь о прогрессе.
cwharland
Рассмотрим цитонизацию: pandas.pydata.org/pandas-docs/dev/…
Энди Хейден,
@AndyHayden - Как я прокомментировал ваш ответ, ваша реализация довольно хороша и добавляет небольшое количество времени к общей работе. Я также трионизировал три операции в накопительном пакете функций, который восстанавливал все время, которое теперь посвящено отчетам о прогрессе. Так что в конце я держу пари, что у меня будут индикаторы выполнения с сокращением общего времени обработки, если я выполню Cython для всей функции.
cwharland

Ответы:

279

В связи с популярным спросом, tqdmдобавил поддержку pandas. В отличие от других ответов, это не будет заметно замедлять панд - вот пример для DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Если вам интересно, как это работает (и как изменить его для ваших собственных обратных вызовов), посмотрите примеры на github , полную документацию по pypi или импортируйте модуль и запустите help(tqdm).

РЕДАКТИРОВАТЬ


Чтобы напрямую ответить на исходный вопрос, замените:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

с участием:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Примечание: tqdm <= v4.8 : для версий tqdm ниже 4.8 вместо tqdm.pandas()вас нужно было сделать:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
источник
5
tqdmизначально был создан для простых итераций: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passподдержка панд была моим недавним взломом :)
casper.dcl
6
Кстати, если вы используете ноутбуки Jupyter, вы также можете использовать tqdm_notebooks, чтобы получить более красивую полосу. Вместе с пандами вам сейчас нужно создать его экземпляр, как from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) показано здесь
grinsbaeckchen
2
Начиная с версии 4.8.1 - используйте tqdm.pandas (). github.com/tqdm/tqdm/commit/...
Mork
1
Спасибо, @mork правильный. Мы работаем (медленно) над tqdmверсией 5, которая делает вещи более модульными.
casper.dcl
1
Последние рекомендации по синтаксису см. В документации по tqdm Pandas здесь: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ,
18

Подправить ответ Джеффа (и использовать его как функцию многократного использования).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Примечание. Применяйте обновления в процентах . Если ваша функция stdouts, то это не будет работать.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Как обычно, вы можете добавить это к вашим групповым объектам как метод:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Как упоминалось в комментариях, это не та особенность, которую основные панды были бы заинтересованы в реализации. Но python позволяет вам создавать их для многих объектов / методов панд (это было бы довольно трудоемким делом ... хотя вы должны быть в состоянии обобщить этот подход).

Энди Хейден
источник
Я говорю «довольно много работы», но вы, вероятно, могли бы переписать всю эту функцию как (более общий) декоратор.
Энди Хейден,
Спасибо за расширение поста Джеффа. Я реализовал оба варианта, и замедление для каждого из них совершенно минимально (добавлено в общей сложности 1,1 минуты к операции, для завершения которой потребовалось 27 минут). Таким образом, я могу видеть прогресс и, учитывая временную природу этих операций, я думаю, что это приемлемое замедление.
cwharland
Отлично, рад, что это помогло. Я был действительно удивлен замедлением (когда я попробовал пример), я ожидал, что это будет намного хуже.
Энди Хейден,
1
Чтобы еще больше повысить эффективность опубликованных методов, я ленился по поводу импорта данных (pandas слишком хорошо справляется с грязным CSV-кодом !!), и некоторые из моих записей (~ 1%) полностью уничтожили вставки (думаю, что записи вставляются в отдельные поля). Их устранение приводит к значительному ускорению в объединении функций, поскольку не было никакой двусмысленности относительно того, что делать во время операций разделения-объединения-объединения.
cwharland
1
У меня до 8 минут ... но я добавил кое-что в набор функций (больше возможностей -> лучше AUC!). Эти 8 минут относятся к чанку (всего два чанка прямо сейчас) с каждым чанком в районе 12 миллионов строк. Так что да ... 16 минут, чтобы выполнить тяжелые операции с 24 миллионами строк с помощью HDFStore (и в пакете функций есть nltk). Достаточно хорошо. Будем надеяться, что интернет не судит меня по первоначальному невежеству или амбивалентности в отношении испорченных вставок =)
cwharland
11

Если вам нужна поддержка, как использовать это в записной книжке Jupyter / ipython, как я сделал, вот полезное руководство и источник для соответствующей статьи :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Обратите внимание на подчеркивание в операторе импорта для _tqdm_notebook. Как упоминается в упомянутой статье, разработка находится на поздней стадии бета-тестирования.

Виктор Вулович
источник
8

Для тех, кто хочет применить tqdm к своему собственному параллельному коду pandas-apply.

(В течение многих лет я пробовал некоторые библиотеки для распараллеливания, но я никогда не находил решение для распараллеливания на 100%, в основном для функции apply, и мне всегда приходилось возвращаться для своего «ручного» кода.)

df_multi_core - это тот, кого вы называете. Он принимает:

  1. Ваш объект DF
  2. Имя функции, которую вы хотите вызвать
  3. Подмножество столбцов, над которыми может выполняться функция (помогает сократить время / память)
  4. Количество параллельных заданий (-1 или не указано для всех ядер)
  5. Любые другие kwargs, которые принимает функция df (например, «axis»)

_df_split - это внутренняя вспомогательная функция, которая должна располагаться глобально к работающему модулю (Pool.map зависит от места размещения), в противном случае я бы обнаружил ее внутри.

Вот код из моей сущности (я добавлю туда еще функциональные тесты панд):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Ниже приведен тестовый код для параллельного применения с tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

В выходных данных вы видите 1 индикатор выполнения для работы без распараллеливания, а также индикатор выполнения для каждого ядра при работе с распараллеливанием. Есть небольшой скачок, и иногда остальные ядра появляются сразу, но даже тогда я думаю, что это полезно, так как вы получаете статистику прогресса на ядро ​​(it / sec и общее количество записей, например)

введите описание изображения здесь

Спасибо @abcdaa за эту замечательную библиотеку!

Mork
источник
1
Спасибо @mork - не стесняйтесь добавлять на github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar или создать новую страницу на github.com/tqdm/tqdm/wiki
casper. DCL
Спасибо, но пришлось изменить эту часть: try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)из-за исключения KeyError вместо ValueError, измените на Exception для обработки всех случаев.
Мариус
Спасибо @mork - этот ответ должен быть выше.
Энди
5

Вы можете легко сделать это с декоратором

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

затем просто используйте модифицированную_функцию (и изменяйте, когда хотите, чтобы она печаталась)

Джефф
источник
1
Очевидное предупреждение, что это замедлит вашу работу! Вы могли бы даже обновить его с помощью stackoverflow.com/questions/5426546/… например, count / len в процентах.
Энди Хейден
да - у вас будет порядок (количество групп), поэтому в зависимости от того, какое у вас узкое место, это может иметь значение
Джефф
возможно, интуитивно понятное, что нужно сделать, это обернуть это в logged_apply(g, func)функцию, где у вас будет доступ к порядку, и вы сможете вести лог с самого начала.
Энди Хейден,
Я сделал вышеупомянутое в своем ответе, также дерзкое обновление процента. На самом деле я не мог заставить тебя работать ... Я думаю, с обертками. Если вы используете его для подачи заявки, это не так важно в любом случае.
Энди Хейден,
1

Я изменил ответ Джеффа , чтобы включить итоги, чтобы вы могли отслеживать прогресс и переменную, чтобы просто печатать каждые X итераций (это на самом деле значительно повышает производительность, если «print_at» достаточно высокий)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

функция clear_output ()

from IPython.core.display import clear_output

если не на IPython, ответ Энди Хейдена делает это без него

Филипе Сильва
источник