Объекты с общей памятью в многопроцессорной обработке

124

Предположим, у меня есть большой массив numpy в памяти, у меня есть функция, funcкоторая принимает этот гигантский массив в качестве входных данных (вместе с некоторыми другими параметрами). funcс разными параметрами можно запускать параллельно. Например:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Если я использую многопроцессорную библиотеку, то этот гигантский массив будет многократно скопирован в разные процессы.

Есть ли способ разрешить различным процессам использовать один и тот же массив? Этот объект массива доступен только для чтения и никогда не будет изменен.

Что еще сложнее, если arr - это не массив, а произвольный объект Python, есть ли способ поделиться им?

[Изменено]

Я прочитал ответ, но все еще немного запутался. Поскольку fork () является копированием при записи, мы не должны вызывать каких-либо дополнительных затрат при создании новых процессов в библиотеке многопроцессорной обработки Python. Но следующий код предполагает огромные накладные расходы:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

вывод (и, кстати, стоимость увеличивается с увеличением размера массива, поэтому я подозреваю, что накладные расходы, связанные с копированием памяти, все еще существуют):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Почему возникают такие огромные накладные расходы, если мы не копировали массив? А какую часть меня спасает разделяемая память?

вендетта
источник
Вы ведь смотрели документы ?
Лев Левицкий
@FrancisAvila есть ли способ поделиться не только массивом, но и произвольными объектами Python?
Vendetta
1
@LevLevitsky Я должен спросить, есть ли способ поделиться не только массивом, но и произвольными объектами Python?
Vendetta
2
Этот ответ прекрасно объясняет, почему нельзя делиться произвольными объектами Python.
Янне Карила

Ответы:

121

Если вы используете операционную систему, которая использует fork()семантику копирования при записи (как и любой распространенный unix), то до тех пор, пока вы не измените структуру данных, она будет доступна для всех дочерних процессов, не занимая дополнительной памяти. Вам не нужно будет делать ничего особенного (за исключением того, что убедитесь, что вы не изменили объект).

Наиболее эффективного , что вы можете сделать для вашей проблемы будет упаковать массив в эффективную структуру массива ( с использованием numpyили array) место , которое в общей памяти, оберните его multiprocessing.Array, и передать свои функции. Этот ответ показывает, как это сделать .

Если вам нужен доступный для записи общий объект, вам нужно будет обернуть его какой-либо синхронизацией или блокировкой. multiprocessingпредоставляет два метода для этого : один с использованием общей памяти (подходит для простых значений, массивов или ctypes) илиManager прокси, где один процесс хранит память, а менеджер разрешает доступ к ней из других процессов (даже по сети).

Этот Managerподход можно использовать с произвольными объектами Python, но он будет медленнее, чем эквивалент, использующий разделяемую память, поскольку объекты необходимо сериализовать / десериализовать и пересылать между процессами.

В Python доступно множество библиотек и подходов для параллельной обработки . multiprocessing- отличная и хорошо продуманная библиотека, но если у вас есть особые потребности, возможно, лучше будет один из других подходов.

Фрэнсис Авила
источник
25
Следует отметить, что в Python fork () фактически означает копирование при доступе (потому что простой доступ к объекту изменит его счетчик ссылок).
Fabio Zadrozny 07
3
@FabioZadrozny Будет ли он действительно копировать весь объект или только страницу памяти, содержащую его refcount?
zigg 02
5
AFAIK, только страница памяти, содержащая refcount (так, 4 КБ на каждый доступ к объекту).
Фабио Задрозный 02
1
@max Используйте закрытие. Данная функция apply_asyncдолжна ссылаться на общий объект в области видимости напрямую, а не через его аргументы.
Фрэнсис Авила
3
@FrancisAvila, как использовать закрытие? Разве функцию, которую вы даете apply_async, не следует выбирать? Или это только ограничение map_async?
GermanK
17

Я столкнулся с той же проблемой и написал небольшой служебный класс с общей памятью, чтобы обойти ее.

Я использую multiprocessing.RawArray(lockfree), а также доступ к массивам вообще не синхронизирован (lockfree), будьте осторожны, не стреляйте себе ногами.

С этим решением я получаю ускорение примерно в 3 раза на четырехъядерном i7.

Вот код: не стесняйтесь использовать и улучшать его, и, пожалуйста, сообщайте о любых ошибках.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
источник
Только что понял, что вам нужно настроить свои массивы разделяемой памяти, прежде чем создавать многопроцессорный пул, пока не знаю, почему, но это определенно не будет работать наоборот.
martin.preinfalk
Причина в том, что пул многопроцессорной обработки вызывает fork () при создании экземпляра пула, поэтому все, что будет после этого, не получит доступа к указателю на любую общую память, созданную впоследствии.
Xiv
Когда я попробовал этот код под py35, я получил исключение в multiprocessing.sharedctypes.py, поэтому я предполагаю, что этот код предназначен только для py2.
Доктор Хиллиер Даниэль
11

Это предполагаемый вариант использования Ray , библиотеки для параллельного и распределенного Python. Под капотом он сериализует объекты, используя макет данных Apache Arrow (который является форматом с нулевым копированием), и сохраняет их в хранилище объектов с общей памятью, чтобы к ним могли обращаться несколько процессов без создания копий.

Код будет выглядеть следующим образом.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Если вы не вызываете, ray.putто массив все равно будет храниться в общей памяти, но это будет происходить один раз при вызове func, а это не то, что вам нужно.

Обратите внимание, что это будет работать не только для массивов, но и для объектов, содержащих массивы. , например, словари, отображающие целые значения на массивы, как показано ниже.

Вы можете сравнить производительность сериализации в Ray и pickle, запустив следующее в IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Сериализация с Ray лишь немного быстрее, чем pickle, но десериализация в 1000 раз быстрее из-за использования разделяемой памяти (это число, конечно, будет зависеть от объекта).

См. Документацию Ray . Вы можете узнать больше о быстрой сериализации с помощью Ray и Arrow . Обратите внимание, я один из разработчиков Ray.

Роберт Нишихара
источник
1
Рэй звучит хорошо! Но я пробовал использовать эту библиотеку раньше, но, к сожалению, я только что понял, что Ray не поддерживает окна. Я надеюсь, что вы, ребята, сможете поддерживать Windows как можно скорее. Спасибо, разработчики!
Hzzkygcs
6

Как упоминал Роберт Нишихара, Apache Arrow упрощает эту задачу, особенно с помощью хранилища объектов в памяти Plasma, на котором построен Ray.

Я сделал мозговую плазму специально для этой цели - быстрой загрузки и перезагрузки больших объектов в приложении Flask. Это пространство имен объектов с общей памятью для сериализуемых объектов Apache Arrow, включая pickleстроки байтов 'd, сгенерированные с помощью pickle.dumps(...).

Ключевое отличие Apache Ray и Plasma заключается в том, что они отслеживают идентификаторы объектов за вас. Любые процессы, потоки или программы, которые выполняются локально, могут совместно использовать значения переменных, вызывая имя из любого Brainобъекта.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
источник