Заставить Pandas DataFrame apply () использовать все ядра?

105

По состоянию на август 2017 года Pandas DataFame.apply () , к сожалению, все еще ограничен работой с одним ядром, а это означает, что многоядерная машина будет тратить большую часть своего вычислительного времени при запуске df.apply(myfunc, axis=1).

Как можно использовать все свои ядра для параллельного запуска приложения на фреймворке данных?

Роко Мижич
источник

Ответы:

80

Вы можете использовать swifterпакет:

pip install swifter

Он работает как плагин для панд, позволяя повторно использовать applyфункцию:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Он автоматически определит наиболее эффективный способ распараллеливания функции, независимо от того, векторизован он (как в примере выше) или нет.

Дополнительные примеры и сравнение производительности доступны на GitHub. Обратите внимание, что пакет находится в активной разработке, поэтому API может измениться.

Также обратите внимание, что это не будет работать автоматически для строковых столбцов. При использовании строк Swifter вернется к «простым» пандам apply, которые не будут параллельными. В этом случае даже принудительное использование daskне приведет к повышению производительности, и вам будет лучше просто разделить набор данных вручную и распараллелить использованиеmultiprocessing .

Slhck
источник
1
Из чистого любопытства, есть ли способ ограничить количество ядер, которые он использует при параллельном применении? У меня есть общий сервер, поэтому, если я возьму все 32 ядра, никто не будет счастлив.
Максим Хаитович 05
1
@MaximHaytovich Не знаю. Swifter использует dask в фоновом режиме, поэтому, возможно, он соблюдает эти настройки: stackoverflow.com/a/40633117/435093 - в противном случае я бы рекомендовал открыть проблему на GitHub. Автор очень отзывчивый.
slhck 05
@slhck спасибо! Покопаю еще немного. Похоже, что он все равно не работает на сервере Windows - просто зависает, ничего не делая в игрушечной задаче
Максим Хаитович 05
не могли бы вы помочь мне ответить на это: - stackoverflow.com/questions/53561794/…
ak3191
2
Для строк просто добавьте allow_dask_on_strings(enable=True)вот так: df.swifter.allow_dask_on_strings(enable=True).apply(some_function) Источник: github.com/jmcarpenter2/swifter/issues/45
Sumit Sidana
104

Самый простой способ - использовать Dask map_partitions . Вам понадобится этот импорт (вам понадобится pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

и синтаксис

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Я считаю, что 30 - подходящее количество разделов, если у вас 16 ядер). Для полноты картины я рассчитал разницу на своей машине (16 ядер):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2,708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Давая 10-кратное ускорение от pandas применимо к dask apply на разделах. Конечно, если у вас есть функция, которую можно векторизовать, вы должны - в этом случае функция ( y*(x**2+1)) тривиально векторизуется, но есть много вещей, которые векторизовать невозможно.

Роко Мижич
источник
2
Приятно знать, спасибо за публикацию. Можете объяснить, почему вы выбрали 30 разделов? Меняется ли производительность при изменении этого значения?
Эндрю Л.
4
@AndrewL Я предполагаю, что каждый раздел обслуживается отдельным процессом, а с 16 ядрами я предполагаю, что одновременно могут выполняться 16 или 32 процесса. Я попробовал, и, похоже, производительность улучшилась до 32 разделов, но дальнейшее увеличение не имеет положительного эффекта. Я предполагаю, что с четырехъядерным компьютером вам понадобится 8 разделов и т. Д. Обратите внимание, что я заметил некоторое улучшение между 16 и 32, поэтому я думаю, что вам действительно нужно 2x $ NUM_PROCESSORS
Роко Миджич
9
Единственное, что естьThe get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
поводу
6
Для dask v0.20.0 и более поздних версий используйте ddata.map_partitions (lambda df: df.apply ((lambda row: myfunc (* row)), axis = 1)). Compute (scheduler ='cesses ') или один из другие параметры планировщика. Текущий код выдает ошибку «TypeError: ключевое слово get = было удалено. Используйте вместо этого ключевое слово scheduler = с именем желаемого планировщика, например,« потоки »или« процессы »»
mork
1
Перед тем, как сделать это, убедитесь, что фрейм данных не имеет повторяющихся индексов ValueError: cannot reindex from a duplicate axis. Чтобы обойти это, вы должны либо удалить повторяющиеся индексы с помощью, df = df[~df.index.duplicated()]либо сбросить их с помощью df.reset_index(inplace=True).
Хабиб Карбасян
24

pandarallelВместо этого вы можете попробовать : простой и эффективный инструмент для распараллеливания операций pandas на всех ваших процессорах (в Linux и macOS)

  • Распараллеливание требует затрат (создание экземпляров новых процессов, отправка данных через разделяемую память и т. Д.), Поэтому распараллеливание эффективно только в том случае, если объем вычислений для распараллеливания достаточно высок. При очень небольшом объеме данных использование параллелизма не всегда того стоит.
  • Применяемые функции НЕ должны быть лямбда-функциями.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

см. https://github.com/nalepae/pandarallel

G_KOBELIEF
источник
привет, я не могу решить одну проблему, при использовании pandarallel возникает ошибка: AttributeError: не удается обработать локальный объект «prepare_worker. <locals> .closure. <locals> .wrapper». Вы можете мне с этим помочь?
Alex Cam
@Alex Sry Я не разработчик этого модуля. Как выглядят ваши коды? Вы можете попробовать объявить свои «внутренние функции» глобальными? (угадайте)
G_KOBELIEF
@AlexCam Ваша функция должна быть определена вне другой функции, чтобы python мог обработать ее для многопроцессорной обработки
Кенан
1
@G_KOBELIEF С Python> 3.6 мы можем использовать лямбда-функцию с pandaparallel
user110244
18

Если вы хотите остаться в нативном питоне:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

будет применять функцию fпараллельно к столбцу colфрейма данныхdf

Оливье Крючан
источник
После такого подхода , как этот я получил ValueError: Length of values does not match length of indexот __setitem__в pandas/core/frame.py. Не уверен, что я сделал что-то не так, или если назначение df['newcol']не является потокобезопасным.
Rattle
2
Вы можете записать pool.map в промежуточный список temp_result, чтобы разрешить проверку совпадения длины с df, а затем выполнить df ['newcol'] = temp_result?
Оливье Крючан
вы имеете в виду создание новой колонки? что бы вы использовали?
Оливье Крючан
да, присвоение результата карты новому столбцу фрейма данных. Разве map не возвращает список результатов каждого фрагмента, отправленного в функцию f? Так что же произойдет, если присвоить это столбцу newcol? Использование Pandas и Python 3
Мина
Это действительно работает очень гладко! Вы пробовали? Он создает список той же длины, что и df, в том же порядке, что и отправленный. Он буквально выполняет c2 = f (c1) параллельно. Нет более простого способа многопроцессорности в python. С точки зрения производительности кажется, что Рэй тоже может делать хорошие вещи ( todatascience.com/… ), но это не так хорошо, и установка, по моему опыту, не всегда проходит гладко
Оливье Крючан
2

Вот пример базового трансформатора sklearn, в котором pandas apply распараллеливается

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

для получения дополнительной информации см. https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Максим Балацко
источник
0

Чтобы использовать все ядра (физические или логические), вы можете попробовать mapplyв качестве альтернативы swifterи pandarallel.

Вы можете установить количество ядер (и поведение фрагментов) при инициализации:

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

По умолчанию ( n_workers=-1) пакет использует все физические процессоры, доступные в системе. Если ваша система использует гиперпоточность (обычно отображается вдвое больше физических процессоров), mapplyпоявится один дополнительный рабочий , который будет отдавать приоритет многопроцессорному пулу над другими процессами в системе.

В зависимости от вашего определения all your coresвместо этого вы также можете использовать все логические ядра (имейте в виду, что в этом случае процессы, связанные с ЦП, будут сражаться за физические ЦП, что может замедлить вашу работу):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
ddelange
источник