Существует ли простой способ параллельного запуска pandas.DataFrame.isin?

25

У меня есть программа моделирования и скоринга, которая активно использует DataFrame.isinфункцию панд, просматривая списки «похожих» на Facebook записей отдельных пользователей на каждой из нескольких тысяч конкретных страниц. Это самая трудоемкая часть программы, в большей степени, чем части моделирования или оценки, просто потому, что она работает только на одном ядре, а остальные - на нескольких десятках одновременно.

Хотя я знаю, что могу вручную разбить фрейм данных на куски и выполнить операцию параллельно, есть ли простой способ сделать это автоматически? Другими словами, есть ли какой-нибудь пакет, который распознает, что я запускаю легко делегированную операцию, и автоматически распространит ее? Возможно, это требует слишком многого, но в прошлом я был достаточно удивлен тем, что уже доступно в Python, поэтому я думаю, что стоит спросить.

Любые другие предложения о том, как это может быть достигнуто (даже если не с помощью какого-то волшебного пакета единорога!) Также будут оценены. Главным образом, просто пытаясь найти способ сэкономить 15-20 минут на цикл, не тратя столько же времени на кодирование решения.

Therriault
источник
Насколько велик ваш список ценностей? Вы пытались передать это как набор? Для параллелизма вас может заинтересовать Joblib. Он прост в использовании и может ускорить вычисления. Используйте его с большими кусками данных.
oao
Другой вариант - переосмыслить вашу проблему как объединение. Объединения намного быстрее в Pandas stackoverflow.com/questions/23945493/…
Брайан Спиеринг
Еще один вариант - использовать np.in1d, который также является более быстрым stackoverflow.com/questions/21738882/fast-pandas-filtering
Брайан Спиеринг

Ответы:

8

К сожалению, распараллеливание еще не реализовано в пандах. Вы можете присоединиться к этой проблеме GitHub, если вы хотите участвовать в разработке этой функции.

Я не знаю какой-либо «волшебной упаковки единорога» для этой цели, поэтому лучше всего будет написать собственное решение. Но если вы все еще не хотите тратить на это время и хотите узнать что-то новое - вы можете попробовать два метода, встроенных в MongoDB (каркас редукции и структура agg). Смотрите mongodb_agg_framework .

Stanpol
источник
6

Я думаю, что ваша лучшая ставка будет Розетта . Я нахожу это чрезвычайно полезным и легким. Проверьте свои панды методами .

Вы можете получить это по пунктам .

dmvianna
источник
Я бы порекомендовал получить розетту, зайдя прямо в GitHub. Это гарантирует, что вы получите последнюю версию. github.com/columbia-applied-data-science/rosetta
Ян Лэнгмор
0

Существует более распространенная версия этого вопроса, касающаяся распараллеливания функций применения панд - так что это освежающий вопрос :)

Во-первых , я хочу упомянуть более быстрое, поскольку вы попросили «упакованное» решение, и оно появляется в большинстве вопросов SO, касающихся распараллеливания панд.

Но .. Я все еще хотел бы поделиться своим личным основным кодом для него, так как после нескольких лет работы с DataFrame я никогда не находил решение для 100% -ного распараллеливания (в основном для функции apply), и мне всегда приходилось возвращаться за моим " ручной "код.

Благодаря вам я сделал более общей поддержку любого (теоретически) метода DataFrame по его имени (поэтому вам не придется сохранять версии для isin, apply и т. Д.).

Я протестировал его на функциях isin, apply и isna, используя python 2.7 и 3.6. В нем меньше 20 строк, и я следовал соглашению об именах панд, например, "subset" и "njobs".

Я также добавил сравнение времени с dask-эквивалентным кодом для «isin», и оно кажется в X2 раза медленнее, чем этот смысл.

Включает 2 функции:

df_multi_core - это тот, кого вы называете. Он принимает:

  1. Ваш объект DF
  2. Имя функции, которую вы хотите вызвать
  3. Подмножество столбцов, над которыми может выполняться функция (помогает сократить время / память)
  4. Количество параллельных заданий (-1 или опущено для всех ядер)
  5. Любые другие kwargs, которые принимает функция df (например, «axis»)

_df_split - это внутренняя вспомогательная функция, которая должна располагаться глобально к работающему модулю (Pool.map зависит от места размещения), в противном случае я бы обнаружил ее внутри.

Вот код из моей сущности (я добавлю туда еще функциональные тесты панд):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow - это тестовый код для распараллеленного isin , сравнивающий нативную, многоядерную производительность и производительность. На машине с I7 с 8 физическими ядрами я увеличил скорость примерно в четыре раза. Я хотел бы услышать, что вы получаете на ваших реальных данных!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
Mork
источник
@Therriault Я добавил сравнение с dask isin- кажется, что фрагмент кода наиболее эффективен с 'isin' - в X1.75 раз быстрее, чем dask (по сравнению с applyфункцией, которая только на 5% быстрее, чем dask)
mork