Когда я запускаю что-то вроде:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
это работает отлично. Однако, помещая это как функцию класса:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Дает мне следующую ошибку:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Я видел сообщение от Алекса Мартелли, посвященное той же проблеме, но оно не было достаточно явным.
python
multiprocessing
pickle
Мермос
источник
источник
IPython.Parallel
, но там вы могли обойти эту проблему, помещая объекты в узлы. Это кажется довольно раздражающим, чтобы обойти эту проблему с многопроцессорностью.calculate
это пригодны для консервирования, так что кажется , что это может быть решена путем 1) создания функционального объекта с конструктором , который копирует вcalculate
экземпляр , а затем 2) , проходящей экземпляр этой функции объектаPool
«ыmap
метода. Нет?multiprocessing
модуля связаны с его целью кроссплатформенной реализации и отсутствиемfork(2)
системного вызова в Windows. Если вам не нужна поддержка Win32, возможно, существует более простой обходной путь на основе процессов. Или, если вы готовы использовать потоки вместо процессов, вы можете заменить ихfrom multiprocessing import Pool
наfrom multiprocessing.pool import ThreadPool as Pool
.Ответы:
Меня также раздражали ограничения на то, какие функции можно использовать pool.map. Я написал следующее, чтобы обойти это. Похоже, работает, даже для рекурсивного использования parmap.
источник
Я не мог использовать коды, опубликованные до сих пор, потому что коды, использующие «multiprocessing.Pool», не работают с лямбда-выражениями, а коды, не использующие «multiprocessing.Pool», порождают столько процессов, сколько есть рабочих элементов.
Я адаптировал код st, он порождает предопределенное количество рабочих и выполняет итерацию по списку ввода только при наличии незанятого рабочего. Я также включил режим "daemon" для рабочих sttrl-c работает, как ожидалось.
источник
parmap
функцией?(None, None)
в качестве последнего элемента указывает наfun
то, что он достиг конца последовательности элементов для каждого процесса.Многопроцессорная обработка и травление нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.
Если вы используете вилку
multiprocessing
называетсяpathos.multiprocesssing
, вы можете напрямую использовать классы и методы класса в многопроцессорных - хmap
функциях. Это потому, чтоdill
используется вместоpickle
илиcPickle
, иdill
может сериализовать почти все в Python.pathos.multiprocessing
также предоставляет асинхронную функцию отображения ... и можетmap
функционировать с несколькими аргументами (напримерmap(math.pow, [1,2,3], [4,5,6])
)Смотрите обсуждения: что могут делать мультипроцессор и укроп вместе?
и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Он даже обрабатывает код, который вы написали изначально, без изменений и от интерпретатора. Зачем делать что-то еще более хрупкое и специфичное для одного случая?
Получить код здесь: https://github.com/uqfoundation/pathos
И, просто чтобы показать немного больше, что он может сделать:
источник
amap
), которая позволяет использовать индикаторы выполнения и другое асинхронное программирование.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
котором совместимо 2/3. См stackoverflow.com/questions/27873093/... и pypi.python.org/pypi/multiprocess .pathos
у него есть новый стабильный выпуск, а также совместимы с 2.x и 3.x.Насколько я знаю, в настоящее время нет решения вашей проблемы: функция, которую вы предоставляете,
map()
должна быть доступна через импорт вашего модуля. Вот почему работает код Роберта: функциюf()
можно получить, импортировав следующий код:Я фактически добавил «основной» раздел, потому что он следует рекомендациям для платформы Windows («Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непреднамеренных побочных эффектов»).
Я также добавил заглавную букву перед
Calculate
тем, чтобы следовать PEP 8 . :)источник
Решение от mrule правильное, но имеет ошибку: если дочерний элемент отправляет обратно большой объем данных, он может заполнить буфер канала, блокируя дочерний
pipe.send()
, в то время как родительский объект ожидает, пока дочерний объект завершит работуpipe.join()
. Решение состоит в том, чтобы прочитать данные о ребенке перед егоjoin()
использованием. Кроме того, ребенок должен закрыть родительский конец трубы, чтобы предотвратить тупик. Код ниже исправляет это. Также имейте в виду, что этоparmap
создает один процесс на элемент вX
. Более продвинутым решением являетсяmultiprocessing.cpu_count()
разделениеX
на несколько частей, а затем объединение результатов перед возвратом. Я оставляю это как упражнение для читателя, чтобы не испортить краткость милого ответа от mrule. ;)источник
OSError: [Errno 24] Too many open files
. Я думаю, что должны быть какие-то ограничения на количество процессов, чтобы он работал правильно ...Я тоже боролся с этим. У меня были функции в качестве членов данных класса, в качестве упрощенного примера:
Мне нужно было использовать функцию self.f в вызове Pool.map () из того же класса, и self.f не принимает кортеж в качестве аргумента. Поскольку эта функция была встроена в класс, мне не было понятно, как написать тип оболочки, предложенный другими ответами.
Я решил эту проблему с помощью другой оболочки, которая использует кортеж / список, где первый элемент - это функция, а остальные элементы - аргументы этой функции, называемые eval_func_tuple (f_args). Используя это, проблемная строка может быть заменена возвращением pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Вот полный код:
Файл: util.py
Файл: main.py
Запуск main.py даст [11, 22, 33]. Не стесняйтесь улучшить это, например, eval_func_tuple также может быть изменен, чтобы принимать аргументы ключевого слова.
С другой стороны, в других ответах функция parmap может быть сделана более эффективной для случая с большим количеством процессов, чем с числом доступных процессоров. Я копирую отредактированную версию ниже. Это мой первый пост, и я не был уверен, стоит ли мне напрямую редактировать исходный ответ. Я также переименовал некоторые переменные.
источник
Я взял ответ Клауса Се и Агандерса3 и сделал документированный модуль, который будет более читабельным и хранится в одном файле. Вы можете просто добавить его в свой проект. У этого даже есть дополнительный индикатор выполнения!
РЕДАКТИРОВАТЬ : Добавлено предложение @ alexander-mcfarlane и функция тестирования
источник
join()
одновременно, и вы просто получите100%
завершение наtqdm
дисплее. Единственный раз, когда это будет полезно, если у каждого процессора есть предвзятая рабочая нагрузкаtqdm()
чтобы обернуть линию:result = [q_out.get() for _ in tqdm(sent)]
и это работает намного лучше - большие усилия, хотя действительно ценю это так +1Я знаю, что об этом спрашивали более 6 лет назад, но я просто хотел добавить свое решение, так как некоторые из приведенных выше предложений кажутся ужасно сложными, но мое решение на самом деле было очень простым.
Все, что мне нужно было сделать, это обернуть вызов pool.map () во вспомогательную функцию. Передача объекта класса вместе с аргументами для метода в виде кортежа, который выглядел примерно так.
источник
Функции, определенные в классах (даже внутри функций в классах), на самом деле не работают. Тем не менее, это работает:
источник
Я знаю, что этот вопрос задавался 8 лет и 10 месяцев назад, но я хочу представить вам свое решение:
Вам просто нужно превратить функцию класса в статический метод. Но это также возможно с помощью метода класса:
Протестировано в Python 3.7.3
источник
Я изменил метод Клауса Се, потому что, пока он работал для меня с небольшими списками, он зависал, когда число элементов было ~ 1000 или больше. Вместо того, чтобы выдвигать задания по одному с
None
условием остановки, я загружаю очередь ввода сразу и просто позволяю процессам жевать ее, пока она не опустеет.Изменить: к сожалению, сейчас я сталкиваюсь с этой ошибкой в моей системе: максимальный размер очереди многопроцессорной обработки составляет 32767 , надеюсь, что обходные пути там помогут.
источник
Вы можете запустить свой код без каких-либо проблем, если вы каким-то образом вручную проигнорируете
Pool
объект из списка объектов в классе, потому что это невозможно,pickle
как говорит ошибка. Вы можете сделать это с помощью__getstate__
функции (смотрите здесь ) следующим образом.Pool
Объект будет пытаться найти__getstate__
и__setstate__
функцию и выполнять их , если он находит его при запускеmap
, иmap_async
т.д.:Затем сделайте:
даст вам вывод:
Я протестировал приведенный выше код в Python 3.x, и он работает.
источник
Я не уверен, что этот подход был принят, но работа, которую я использую:
Вывод должен быть:
источник
Есть вероятность, что вы захотите применить эту функцию для каждого отдельного экземпляра класса. Тогда вот решение для этого также
источник
Вот мое решение, которое я считаю немного менее хакерским, чем большинство других здесь. Это похоже на ответ nightowl.
источник
С http://www.rueckstiess.net/research/snippets/show/ca1d7d90 и http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Мы можем создать внешнюю функцию и заполнить ее объектом класса self:
ИЛИ без JobLib:
источник
Это может быть не очень хорошим решением, но в моем случае я решаю это так.
Я должен был перейти
self
к своей функции, поскольку я должен получить доступ к атрибутам и функциям моего класса через эту функцию. Это работает для меня. Исправления и предложения всегда приветствуются.источник
Вот пример, который я написал для использования многопроцессорного пула в python3, в частности, для запуска тестов использовался python3.7.7. Я получил свои самые быстрые пробеги, используя
imap_unordered
. Просто включите ваш сценарий и попробуйте. Вы можете использоватьtimeit
или простоtime.time()
выяснить, что работает лучше для вас.В приведенном выше сценарии на
imap_unordered
самом деле кажется худшим для меня. Опробуйте ваш кейс и сравните его с машиной, на которой вы планируете его запускать. Также читайте о пулах процессов . Ура!источник