Совместное использование большого массива Numpy, доступного только для чтения, между многопроцессорными процессами

88

У меня есть массив SciPy (матрица) объемом 60 ГБ, который я должен использовать для более чем 5 multiprocessing Processобъектов. Я видел numpy-sharedmem и читал это обсуждение в списке SciPy. Кажется, есть два подхода - numpy-sharedmemи использование a, multiprocessing.RawArray()и сопоставление NumPy dtypes с ctypes. numpy-sharedmemКажется , это верный путь, но я еще не видел хорошего справочного примера. Мне не нужны никакие блокировки, так как массив (фактически матрица) будет доступен только для чтения. Теперь из-за его размера я бы не хотел копировать. Это звучит , как правильный метод , чтобы создать только копию массива в виде sharedmemмассива, а затем передать его на Processобъекты? Пара конкретных вопросов:

  1. Как лучше всего передать дескрипторы sharedmem подчиненным Process()? Нужна ли мне очередь только для передачи одного массива? Лучше бы трубка? Могу ли я просто передать его как аргумент Process()init подкласса (где я предполагаю, что он маринованный)?

  2. В обсуждении, которое я связал выше, есть упоминание о numpy-sharedmemтом, что он небезопасен для 64 бит? Я определенно использую некоторые структуры, которые не имеют 32-битной адресации.

  3. Есть ли компромисс в RawArray()подходе? Медленнее, медленнее?

  4. Мне нужно какое-либо сопоставление ctype-to-dtype для метода numpy-sharedmem?

  5. Есть ли у кого-нибудь пример такого кода OpenSource? Я очень практичный человек, и мне трудно заставить это работать без какого-либо хорошего примера.

Если есть дополнительная информация, которую я могу предоставить, чтобы прояснить это для других, прокомментируйте, и я добавлю. Благодарность!

Это должно работать в Ubuntu Linux и, возможно, Mac OS, но переносимость не является большой проблемой.

Будет
источник
1
Если разные процессы собираются записывать в этот массив, ожидайте multiprocessingсделать копию всего этого для каждого процесса.
tiago
3
@tiago: «Мне не нужны никакие блокировки, поскольку массив (на самом деле матрица) будет доступен только для чтения»
доктор Ян-Филип Герке
1
@tiago: кроме того, многопроцессорность не создает копию до тех пор, пока явно не указано (через аргументы target_function). Операционная система будет копировать части родительской памяти в дочернюю память только после модификации.
Д-р Ян-Филип Герке
Я уже задавал несколько вопросов об этом раньше. Мое решение можно найти здесь: github.com/david-hoffman/peaks/blob/… (извините, код - это катастрофа).
Дэвид Хоффман

Ответы:

30

@Velimir Mlaker дал отличный ответ. Я подумал, что могу добавить несколько комментариев и небольшой пример.

(Я не смог найти много документации по sharedmem - это результаты моих собственных экспериментов.)

  1. Вам нужно передать дескрипторы при запуске подпроцесса или после его запуска? Если это только первый, вы можете просто использовать targetи argsаргументыProcess . Это потенциально лучше, чем использование глобальной переменной.
  2. На странице обсуждения, на которую вы указали ссылку, видно, что поддержка 64-битного Linux была добавлена ​​в sharedmem некоторое время назад, так что это не могло быть проблемой.
  3. Я не знаю об этом.
  4. Нет. См. Пример ниже.

пример

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

Выход

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

Этот связанный вопрос может быть полезен.

Джеймс Лим
источник
37

Если вы работаете в Linux (или любой другой POSIX-совместимой системе), вы можете определить этот массив как глобальную переменную. multiprocessingиспользуется fork()в Linux, когда запускает новый дочерний процесс. Вновь созданный дочерний процесс автоматически разделяет память со своим родителем, пока он не изменяет ее ( копирование при записи механизм ).

Поскольку вы говорите: «Мне не нужны никакие блокировки, поскольку массив (на самом деле матрица) будет доступен только для чтения», использование этого поведения будет очень простым и все же чрезвычайно эффективным подходом: все дочерние процессы будут иметь доступ те же данные в физической памяти при чтении этого большого массива numpy.

Не передать свой массив в Process()конструктор, это будет инструктировать multiprocessingк pickleданным для ребенка, что было бы крайне неэффективно или невозможно в вашем случае. В Linux сразу после fork()дочернего элемента находится точная копия родительского элемента, использующая ту же физическую память, поэтому все, что вам нужно сделать, - это убедиться, что переменная Python, 'содержащая' матрицу, доступна из targetфункции, которую вы передаетеProcess() . Обычно этого можно добиться с помощью «глобальной» переменной.

Пример кода:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

В Windows - которая не поддерживает fork()- multiprocessingиспользует вызов win32 API CreateProcess. Он создает совершенно новый процесс из любого исполняемого файла. Вот почему в Windows требуется передавать данные дочернему элементу, если нужны данные, которые были созданы во время выполнения родительского элемента.

Д-р Ян-Филип Герке
источник
3
Копирование при записи будет копировать страницу, содержащую счетчик ссылок (поэтому каждый разветвленный питон будет иметь свой собственный счетчик ссылок), но не будет копировать весь массив данных.
robince
1
Я бы добавил, что у меня был больше успехов с переменными уровня модуля, чем с глобальными переменными ... т.е. добавляю переменную в модуль в глобальной области видимости перед вилкой
robince
5
Предостережение для людей, наткнувшихся на этот вопрос / ответ: если вы используете связанный с OpenBLAS Numpy для многопоточной операции, обязательно отключите его многопоточность (экспорт OPENBLAS_NUM_THREADS = 1) при использовании, multiprocessingиначе дочерние процессы могут зависнуть ( обычно используется 1 / n одного процессора, а не n процессоров) при выполнении операций линейной алгебры над общим глобальным массивом / матрицей. Известный многопоточной конфликт с OpenBLAS , кажется, распространяется на Pythonmultiprocessing
Dologan
1
Может ли кто-нибудь объяснить, почему python не просто использует ОС forkдля передачи заданных параметров Processвместо их сериализации? То есть, нельзя forkли применить к родительскому процессу непосредственно перед child вызовом, чтобы значение параметра все еще было доступно из ОС? Казалось бы, более эффективно, чем сериализация?
максимум
2
Мы все знаем, что fork()это недоступно в Windows, об этом было сказано в моем ответе и несколько раз в комментариях. Я знаю , что это был ваш первоначальный вопрос, и я ответил ему четыре комментария выше этого : «компромисс , чтобы использовать тот же метод передачи параметров на обеих платформах по умолчанию, для лучшей ремонтопригодности и для обеспечения равного поведения.». Оба способа имеют свои преимущества и недостатки, поэтому в Python 3 пользователю предоставляется большая гибкость в выборе метода. Это обсуждение не является продуктивным без обсуждения деталей, чего мы не должны здесь делать.
Доктор Ян-Филип Герке
24

Возможно, вас заинтересует небольшой фрагмент кода, который я написал: github.com/vmlaker/benchmark-sharedmem

Единственный интересующий файл - это main.py. Это эталон numpy-sharedmem - код просто передает массивы (или numpyили sharedmem) порожденным процессам через Pipe. Рабочие просто обращаются sum()к данным. Меня интересовало только сравнение времени передачи данных между двумя реализациями.

Я также написал другой, более сложный код: github.com/vmlaker/sherlock .

Здесь я использую модуль numpy-sharedmem для обработки изображений в реальном времени с помощью OpenCV - изображения представляют собой массивы NumPy в соответствии с новым cv2API OpenCV . Изображения, фактически ссылки на них, распределяются между процессами через объект словаря, созданный изmultiprocessing.Manager (в отличие от использования Queue или Pipe.) Я получаю большие улучшения производительности по сравнению с использованием простых массивов NumPy.

Труба против очереди :

По моему опыту, IPC с Pipe быстрее, чем Queue. И это имеет смысл, поскольку Queue добавляет блокировку, чтобы сделать ее безопасной для нескольких производителей / потребителей. Труба - нет. Но если у вас есть только два процесса, которые разговаривают друг с другом, безопасно использовать Pipe или, как говорится в документации:

... нет риска повреждения из-за процессов, использующих разные концы трубы одновременно.

sharedmem безопасность :

Основная проблема с sharedmemмодулем - это возможность утечки памяти при некорректном выходе из программы. Это описано в длительной дискуссии здесь . Хотя 10 апреля 2011 г. Стурла упоминает исправление утечки памяти, с тех пор я все еще сталкивался с утечками, используя оба репозитория, собственный репозиторий Стурлы Молдена на GitHub ( github.com/sturlamolden/sharedmem-numpy ) и Криса Ли-Мессера на Bitbucket ( bitbucket.org/cleemesser/numpy-sharedmem ).

Велимир Млакер
источник
Спасибо, очень информативно. Однако утечка памяти sharedmemкажется большой проблемой. Есть какие-нибудь подсказки для решения этого?
Will
1
Я не только заметил утечки, но и не искал их в коде. Я добавил к своему ответу в разделе «Безопасность sharedmem» выше sharedmem, для справки , хранителей двух репозиториев модуля с открытым исходным кодом .
Velimir Mlaker
14

Если ваш массив такой большой, вы можете использовать numpy.memmap. Например, если у вас есть массив, хранящийся на диске, скажем'test.array' , вы можете использовать одновременные процессы для доступа к данным в нем даже в режиме «записи», но ваш случай проще, поскольку вам нужен только режим «чтения».

Создание массива:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

Затем вы можете заполнить этот массив так же, как и обычный массив. Например:

a[:10,:100]=1.
a[10:,100:]=2.

Данные сохраняются на диске, когда вы удаляете переменную a.

Позже вы можете использовать несколько процессов, которые будут обращаться к данным в test.array:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Связанные ответы:

Саулло Г. П. Кастро
источник
3

Вам также может быть полезно взглянуть на документацию по pyro, как если бы вы могли правильно разделить свою задачу, вы могли бы использовать ее для выполнения разных разделов на разных машинах, а также на разных ядрах на одной машине.

Стив Барнс
источник
0

Почему бы не использовать многопоточность? Ресурсы основного процесса могут совместно использоваться его потоками изначально, поэтому многопоточность, очевидно, является лучшим способом совместного использования объектов, принадлежащих основному процессу.

Если вас беспокоит механизм GIL в Python, возможно, вы можете прибегнуть к использованию nogilof numba.

Нико
источник