Вы можете использовать swifter
пакет:
pip install swifter
Он работает как плагин для панд, позволяя повторно использовать apply
функцию:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Он автоматически определит наиболее эффективный способ распараллеливания функции, независимо от того, векторизован он (как в примере выше) или нет.
Дополнительные примеры и сравнение производительности доступны на GitHub. Обратите внимание, что пакет находится в активной разработке, поэтому API может измениться.
Также обратите внимание, что это не будет работать автоматически для строковых столбцов. При использовании строк Swifter вернется к «простым» пандам apply
, которые не будут параллельными. В этом случае даже принудительное использование dask
не приведет к повышению производительности, и вам будет лучше просто разделить набор данных вручную и распараллелить использованиеmultiprocessing
.
allow_dask_on_strings(enable=True)
вот так:df.swifter.allow_dask_on_strings(enable=True).apply(some_function)
Источник: github.com/jmcarpenter2/swifter/issues/45Самый простой способ - использовать Dask map_partitions . Вам понадобится этот импорт (вам понадобится
pip install dask
):import pandas as pd import dask.dataframe as dd from dask.multiprocessing import get
и синтаксис
data = <your_pandas_dataframe> ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y,z, ...): return <whatever> res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(Я считаю, что 30 - подходящее количество разделов, если у вас 16 ядер). Для полноты картины я рассчитал разницу на своей машине (16 ядер):
data = pd.DataFrame() data['col1'] = np.random.normal(size = 1500000) data['col2'] = np.random.normal(size = 1500000) ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y): return y*(x**2+1) def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) def pandas_apply(): return apply_myfunc_to_DF(data) def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) def vectorized(): return myfunc(data['col1'], data['col2'] ) t_pds = timeit.Timer(lambda: pandas_apply()) print(t_pds.timeit(number=1))
t_dsk = timeit.Timer(lambda: dask_apply()) print(t_dsk.timeit(number=1))
t_vec = timeit.Timer(lambda: vectorized()) print(t_vec.timeit(number=1))
Давая 10-кратное ускорение от pandas применимо к dask apply на разделах. Конечно, если у вас есть функция, которую можно векторизовать, вы должны - в этом случае функция (
y*(x**2+1)
) тривиально векторизуется, но есть много вещей, которые векторизовать невозможно.источник
The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
ValueError: cannot reindex from a duplicate axis
. Чтобы обойти это, вы должны либо удалить повторяющиеся индексы с помощью,df = df[~df.index.duplicated()]
либо сбросить их с помощьюdf.reset_index(inplace=True)
.pandarallel
Вместо этого вы можете попробовать : простой и эффективный инструмент для распараллеливания операций pandas на всех ваших процессорах (в Linux и macOS)from pandarallel import pandarallel from math import sin pandarallel.initialize() # FORBIDDEN df.parallel_apply(lambda x: sin(x**2), axis=1) # ALLOWED def func(x): return sin(x**2) df.parallel_apply(func, axis=1)
см. https://github.com/nalepae/pandarallel
источник
Если вы хотите остаться в нативном питоне:
import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(f, df['col'])
будет применять функцию
f
параллельно к столбцуcol
фрейма данныхdf
источник
ValueError: Length of values does not match length of index
от__setitem__
вpandas/core/frame.py
. Не уверен, что я сделал что-то не так, или если назначениеdf['newcol']
не является потокобезопасным.Вот пример базового трансформатора sklearn, в котором pandas apply распараллеливается
import multiprocessing as mp from sklearn.base import TransformerMixin, BaseEstimator class ParllelTransformer(BaseEstimator, TransformerMixin): def __init__(self, n_jobs=1): """ n_jobs - parallel jobs to run """ self.variety = variety self.user_abbrevs = user_abbrevs self.n_jobs = n_jobs def fit(self, X, y=None): return self def transform(self, X, *_): X_copy = X.copy() cores = mp.cpu_count() partitions = 1 if self.n_jobs <= -1: partitions = cores elif self.n_jobs <= 0: partitions = 1 else: partitions = min(self.n_jobs, cores) if partitions == 1: # transform sequentially return X_copy.apply(self._transform_one) # splitting data into batches data_split = np.array_split(X_copy, partitions) pool = mp.Pool(cores) # Here reduce function - concationation of transformed batches data = pd.concat( pool.map(self._preprocess_part, data_split) ) pool.close() pool.join() return data def _transform_part(self, df_part): return df_part.apply(self._transform_one) def _transform_one(self, line): # some kind of transformations here return line
для получения дополнительной информации см. https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8
источник
Чтобы использовать все ядра (физические или логические), вы можете попробовать
mapply
в качестве альтернативыswifter
иpandarallel
.Вы можете установить количество ядер (и поведение фрагментов) при инициализации:
import pandas as pd import mapply mapply.init(n_workers=-1) ... df.mapply(myfunc, axis=1)
По умолчанию (
n_workers=-1
) пакет использует все физические процессоры, доступные в системе. Если ваша система использует гиперпоточность (обычно отображается вдвое больше физических процессоров),mapply
появится один дополнительный рабочий , который будет отдавать приоритет многопроцессорному пулу над другими процессами в системе.В зависимости от вашего определения
all your cores
вместо этого вы также можете использовать все логические ядра (имейте в виду, что в этом случае процессы, связанные с ЦП, будут сражаться за физические ЦП, что может замедлить вашу работу):import multiprocessing n_workers = multiprocessing.cpu_count() # or more explicit import psutil n_workers = psutil.cpu_count(logical=True)
источник