Я имею дело с довольно большим Pandas DataFrame - мой набор данных похож на следующую df
настройку:
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Примечание. Чтобы получить минимальный пример, его можно легко установить на подмножество (например, с помощью df.loc[df['time'] <= 400, :]
), но, поскольку я все равно имитирую данные, я подумал, что исходный размер даст лучший обзор.
Для каждой определенной группы ['measurement_id', 'time', 'group']
мне нужно вызвать следующую функцию:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Для повышения производительности я попробовал два подхода:
Pandarallel пакет
Первым подходом было распараллелить вычисления с использованием pandarallel
пакета:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Тем не менее, это кажется неоптимальным, поскольку он потребляет много оперативной памяти, и не все ядра используются в вычислениях (даже несмотря на то, что в pandarallel.initialize()
методе явно указано количество ядер ). Кроме того, иногда вычисления завершаются с различными ошибками, хотя у меня не было возможности найти причину этого (возможно, нехватка оперативной памяти?).
PySpark Pandas UDF
Я также дал UDF Spark Pandas, хотя я совершенно новичок в Spark. Вот моя попытка:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
К сожалению, производительность также была неудовлетворительной, и из того, что я читал по этой теме, это может быть просто бременем использования функции UDF, написанной на Python, и связанной с этим необходимости преобразования всех объектов Python в объекты Spark и обратно.
Итак, вот мои вопросы:
- Можно ли отрегулировать любой из моих подходов, чтобы устранить возможные узкие места и повысить производительность? (например, настройка PySpark, настройка неоптимальных операций и т. д.)
- Есть ли у них лучшие альтернативы? Как они соотносятся с предоставленными решениями с точки зрения производительности?
dask
(((так что мой комментарий это просто совет для исследования.Ответы:
+1
за упоминание установки надстройки накладных расходов для каждой стратегии вычисления. Это всегда создает точку безубыточности, только после этого нестратегия[SERIAL]
может достичь какой-либо полезной радости от некоторого[TIME]
ускорения желаемого для -домена домена (но, если иное, как правило,[SPACE]
-Дополнения стоимости домена позволяют или остаются выполнимыми - да, ОЗУ). .. наличие и доступ к устройству такого размера, бюджету и другим подобным ограничениям реального мира)Во-первых,
проверка перед полетом, перед тем как мы взлетим
. Новая, неукоснительная формулировка Закона Амдала в настоящее время может включать обе эти дополнительные
pSO + pTO
накладные расходы и отражает их при прогнозировании достижимых уровней ускорения, включая безубыточность. точка, с которой может стать значимым (в смысле затраты / эффект, эффективность) идти параллельно.Тем не менее,
это не наша основная проблема здесь .
Это идет дальше:
Далее,
учитывая вычислительные затраты
SpectralClustering()
, которые здесь будут использоваться для использования ядра радиальной функции Больцмана~ exp( -gamma * distance( data, data )**2 )
, по-видимому, нет никакогоdata
преимущества от разделения -объекта на любое количество несвязанных рабочих единиц, так какdistance( data, data )
-компонент по определению должен посетить всеdata
-элементы (см. затраты на коммуникацию для{ process | node }
топологий с произвольной передачей значений по очевидным причинам ужасно плохи, если не наихудшие сценарии использования для{ process | node }
-распределенной обработки, если не прямые анти-шаблоны (за исключением некоторых действительно загадочных, не требующих памяти / не имеющих состояния, но все же вычислительных структур).Для педантичных аналитиков, да - прибавьте к этому (и мы уже можем сказать, плохое состояние) затраты - опять-таки - любой-к-любому- k-средних - обработки , здесь,
O( N^( 1 + 5 * 5 ) )
что об этом идетN ~ len( data ) ~ 1.12E6+
, ужасно против нашего желания иметь некоторые умная и быстрая обработка.И что?
Несмотря на то, что затраты на настройку не игнорируются, повышенные расходы на связь почти наверняка отключат любое улучшение от использования описанных выше попыток перехода от чисто
[SERIAL]
технологического потока в некую форму просто -[CONCURRENT]
или истинной[PARALLEL]
оркестровки некоторых рабочих подразделений. , из-за увеличения накладных расходов, связанных с необходимостью реализации (тандемной пары) топологий передачи любого значения любому.Если бы не они?
Что ж, это звучит как оксюморон вычислительной науки - даже если бы это было возможно, стоимость предварительно вычисленных расстояний «от любого к любому» (что потребовало бы этих огромных
[TIME]
затрат) - «заранее» - сложность домена (где? Как? другие, неустранимые задержки, допускающие возможную маскировку задержек с помощью некоторого (пока неизвестного) постепенного наращивания полной в будущем матрицы расстояния «любой-к-любому»?)), но переместили бы эти принципиально существующие затраты в какое-то другое место в[TIME]
- и[SPACE]
-Домены, а не сокращать их.Единственное, что я знаю до сих пор, - это попытаться, если проблема может быть переформулирована в другую, сформулированную в QUBO, проблемную моду (см .: Q- qantum- U nconstrained- B inary- O ptimisation). хорошая новость заключается в том, что инструменты для этого существуют, база знаний из первых рук и практический опыт решения проблем существуют и расширяются)
Производительность захватывает дух - проблема, сформулированная
O(1)
в[TIME]
QUBO, имеет многообещающий (!) Решатель в постоянном времени (в -Domain) и несколько ограничен в[SPACE]
-Domain (где недавно анонсированные трюки LLNL могут помочь избежать этого физического мира, текущей реализации QPU, ограничения проблемы размеры).источник
Это не ответ, но ...
Если вы бежите
(т. е. только с пандами), вы заметите, что вы уже используете несколько ядер. Это потому, что
sklearn
используетjoblib
по умолчанию для распараллеливания работы. Вы можете поменять планировщик в пользу Dask и, возможно, получить большую эффективность за счет совместного использования данных между потоками, но до тех пор, пока работа, которую вы выполняете, связана с процессором, как это, вы ничего не сможете сделать, чтобы ускорить его.Короче говоря, это проблема алгоритма: выясните, что вам действительно нужно вычислить, прежде чем пытаться рассмотреть различные основы для его вычисления.
источник
joblib
порожденными процессами , которые не имеют ничего общего с потоками, тем более с разделением? Спасибо за ваше любезное разъяснение аргументов.Я не являюсь экспертом
Dask
, но в качестве основы я предоставляю следующий код:источник