Мне часто нужно применить функцию к очень большим группам DataFrame
(смешанных типов данных), и я хотел бы использовать преимущества нескольких ядер.
Я могу создать итератор из групп и использовать модуль многопроцессорности, но это неэффективно, потому что каждая группа и результаты функции должны быть обработаны для обмена сообщениями между процессами.
Есть ли способ избежать травления или даже полностью избежать копирования DataFrame
? Похоже, что функции разделяемой памяти многопроцессорных модулей ограничены numpy
массивами. Есть ли другие варианты?
python
pandas
multiprocessing
shared-memory
user2303
источник
источник
Ответы:
Из комментариев выше кажется, что это запланировано на
pandas
какое-то время (есть еще интересныйrosetta
проект, который я только что заметил).Однако до тех пор, пока не будут включены все параллельные функции
pandas
, я заметил, что очень легко написать эффективные параллельные дополнения, не копирующие память,pandas
напрямую с использованиемcython
+ OpenMP и C ++.Вот краткий пример написания параллельной группировки по сумме, которая используется примерно так:
import pandas as pd import para_group_demo df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
и вывод:
sum key 0 6 1 11 2 4
Примечание. Несомненно, функциональность этого простого примера в конечном итоге станет частью
pandas
. Однако в течение некоторого времени некоторые вещи будет более естественным для параллелизма в C ++, и важно знать, насколько легко это объединитьpandas
.Для этого я написал простое расширение с одним исходным файлом, код которого следует ниже.
Он начинается с импорта и определений типов.
from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange import pandas as pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
Тип C ++
unordered_map
предназначен для суммирования по одному потоку, а тип -vector
для суммирования по всем потокам.Теперь о функции
sum
. Он начинается с представлений типизированной памяти для быстрого доступа:def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
Функция продолжает делить половину поровну на потоки (здесь жестко задано до 4), и каждый поток суммирует записи в своем диапазоне:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
Когда потоки завершены, функция объединяет все результаты (из разных диапазонов) в один
unordered_map
:cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
Осталось только создать
DataFrame
и вернуть результаты:key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df
источник