Эффективное применение функции к сгруппированному фрейму данных pandas параллельно

89

Мне часто нужно применить функцию к очень большим группам DataFrame(смешанных типов данных), и я хотел бы использовать преимущества нескольких ядер.

Я могу создать итератор из групп и использовать модуль многопроцессорности, но это неэффективно, потому что каждая группа и результаты функции должны быть обработаны для обмена сообщениями между процессами.

Есть ли способ избежать травления или даже полностью избежать копирования DataFrame? Похоже, что функции разделяемой памяти многопроцессорных модулей ограничены numpyмассивами. Есть ли другие варианты?

user2303
источник
Насколько я знаю, нет возможности поделиться произвольными объектами. Мне интересно, неужели травление занимает намного больше времени, чем выигрыш от многопроцессорности. Возможно, вам стоит поискать возможность создавать большие рабочие пакеты для каждого процесса, чтобы сократить относительное время травления. Другая возможность - использовать многопроцессорность при создании групп.
Себастьян Верк
3
Я делаю что-то подобное, но использую UWSGI, Flask и предварительную подготовку: я загружаю фрейм данных pandas в процесс, разветвляю его x раз (делая его объектом общей памяти), а затем вызываю эти процессы из другого процесса python, в котором я объединяю результаты. atm Я использую JSON в качестве коммуникационного процесса, но это приближается (но все еще очень экспериментально): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Кстати, вы когда-нибудь смотрели HDF5 с разбивкой на части? (HDF5 не предназначен для одновременной записи, но вы также можете сохранить в отдельные файлы и, в конце концов,
объединить
7
это будет нацелено на 0.14, см. эту проблему: github.com/pydata/pandas/issues/5751
Джефф,
4
@Jeff был переведен на 0.15 = (
pyCthon

Ответы:

12

Из комментариев выше кажется, что это запланировано на pandasкакое-то время (есть еще интересный rosettaпроект, который я только что заметил).

Однако до тех пор, пока не будут включены все параллельные функции pandas, я заметил, что очень легко написать эффективные параллельные дополнения, не копирующие память, pandasнапрямую с использованием cython+ OpenMP и C ++.

Вот краткий пример написания параллельной группировки по сумме, которая используется примерно так:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

и вывод:

     sum
key     
0      6
1      11
2      4

Примечание. Несомненно, функциональность этого простого примера в конечном итоге станет частью pandas. Однако в течение некоторого времени некоторые вещи будет более естественным для параллелизма в C ++, и важно знать, насколько легко это объединить pandas.


Для этого я написал простое расширение с одним исходным файлом, код которого следует ниже.

Он начинается с импорта и определений типов.

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

Тип C ++ unordered_mapпредназначен для суммирования по одному потоку, а тип - vectorдля суммирования по всем потокам.

Теперь о функции sum. Он начинается с представлений типизированной памяти для быстрого доступа:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

Функция продолжает делить половину поровну на потоки (здесь жестко задано до 4), и каждый поток суммирует записи в своем диапазоне:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Когда потоки завершены, функция объединяет все результаты (из разных диапазонов) в один unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Осталось только создать DataFrameи вернуть результаты:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ами Тавори
источник