Невозможно выбрать <type 'instancemethod'> при использовании многопроцессорной обработки Pool.map ()

218

Я пытаюсь использовать multiprocessing«s Pool.map()функцию , чтобы разделить из работы одновременно. Когда я использую следующий код, он работает нормально:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Однако, когда я использую его в более объектно-ориентированном подходе, это не работает. Это сообщение об ошибке:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Это происходит, когда моя основная программа:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

и следующий мой someClassкласс:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Кто-нибудь знает, в чем может быть проблема, или простой способ ее обойти?

Ventolin
источник
4
если f - вложенная функция, то есть похожая ошибкаPicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg

Ответы:

122

Проблема заключается в том, что многопроцессорная обработка должна выполнять сортировку между процессами, а связанные методы не могут быть выбраны. Обходной путь (считаете ли вы это «простым» или нет ;-) - это добавить инфраструктуру в вашу программу, чтобы такие методы можно было выбрать, зарегистрировав ее с помощью метода стандартной библиотеки copy_reg .

Например, вклад Стивена Бетарда в этот поток (ближе к концу потока) демонстрирует один вполне работоспособный подход, позволяющий осуществлять метод травления / расщепления через copy_reg.

Алекс Мартелли
источник
Это здорово - спасибо. Похоже, что в какой-то степени все пошло дальше: используя код на pastebin.ca/1693348, я теперь получаю RuntimeError: максимальная глубина рекурсии превышена. Я осмотрелся, и в одном сообщении на форуме рекомендовалось увеличить максимальную глубину до 1500 (по умолчанию 1000), но я не испытывал там радости. Честно говоря, я не вижу, какая часть (по крайней мере, моего кода) может выходить из-под контроля, если только по какой-то причине код не обрабатывается и не обрабатывается в цикле из-за небольших изменений, которые я сделал для того, чтобы сделать Код Стивена OO'd?
Вентолин
1
Ваши _pickle_methodвозвращения self._unpickle_method, связанный метод; поэтому, конечно, теперь pickle пытается засолить THAT - и делает это так, как вы сказали: вызывая _pickle_method, рекурсивно. Т.е., OOиспользуя код таким образом, вы неизбежно вводите бесконечную рекурсию. Я предлагаю вернуться к коду Стивена (и не поклоняться на алтаре ОО, когда это не уместно: многие вещи в Python лучше всего выполнять более функциональным способом, и это один из них).
Алекс Мартелли
15
Для супер-супер-ленивых , посмотрите единственный ответ, который потрудился опубликовать настоящий не искаженный код ...
Cerin
2
Еще один способ исправить / обойти проблему травления - использовать укроп, см. Мой ответ stackoverflow.com/questions/8804830/…
rockportrocker
74

Все эти решения уродливы, потому что многопроцессорная обработка и выборка нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.

Если вы используете вилку multiprocessingназывается pathos.multiprocesssing, вы можете напрямую использовать классы и методы класса в многопроцессорных - х mapфункциях. Это потому, что dillиспользуется вместо pickleили cPickle, и dillможет сериализовать почти все в Python.

pathos.multiprocessingтакже предоставляет асинхронную функцию отображения ... и может mapфункционировать с несколькими аргументами (например map(math.pow, [1,2,3], [4,5,6]))

Смотрите: что могут делать мультипроцессор и укроп вместе?

и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

И просто, чтобы быть явным, вы можете делать именно то, что вы хотели, в первую очередь, и вы можете сделать это от переводчика, если хотите.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Получить код здесь: https://github.com/uqfoundation/pathos

Майк Маккернс
источник
3
Не могли бы вы обновить этот ответ на основе pathos.pp, потому что pathos.multiprocessing больше не существует?
Сахил Годхейн
10
Я pathosавтор. Версии, на которую вы ссылаетесь, уже несколько лет. Попробуйте версию на github, вы можете использовать pathos.ppили github.com/uqfoundation/ppft .
Майк Маккернс
1
или github.com/uqfoundation/pathos . @SaheelGodhane: новая версия давно назрела, но скоро должна выйти.
Майк Маккернс
3
Сначала pip install setuptools, потом pip install git+https://github.com/uqfoundation/pathos.git@master. Это получит соответствующие зависимости. Новый релиз почти готов ... теперь почти все в нем pathosтакже работает на Windows и 3.xсовместимо.
Майк Маккернс
1
@Rika: Да. доступны блокирующие, итеративные и асинхронные карты.
Майк Маккернс
35

Вы также можете определить __call__()внутри себя метод someClass(), который вызывает, someClass.go()а затем передает экземпляр someClass()в пул. Этот объект является маринованным, и он прекрасно работает (для меня) ...

dorvak
источник
3
Это намного проще, чем метод, предложенный Алексом Мартелли, но вы ограничены отправкой только одного метода на класс в ваш пул многопроцессорных систем.
устарел
6
Еще одна деталь, о которой следует помнить, заключается в том, что травится только объект (экземпляр класса), а не сам класс. Следовательно, если вы изменили какие-либо атрибуты класса по сравнению со значениями по умолчанию, эти изменения не будут распространяться на другие процессы. Обходной путь - убедиться, что все, что нужно вашей функции, хранится как атрибут экземпляра.
устарел
2
@dorvak Не могли бы вы показать простой пример с __call__()? Я думаю, что ваш ответ может быть более чистым - я изо всех сил пытаюсь понять эту ошибку, и в первый раз я прихожу, чтобы увидеть вызов. Кстати, также этот ответ поможет уточнить, что делает мультипроцессор: [ stackoverflow.com/a/20789937/305883]
user305883
1
Можете ли вы привести пример этого?
frmsaul
1
Опубликован новый ответ (в настоящее время ниже этого) с примером кода для этого.
Аарон
22

Некоторые ограничения для решения Стивена Бетарда:

Когда вы регистрируете свой метод класса как функцию, деструктор вашего класса неожиданно вызывается каждый раз, когда заканчивается обработка вашего метода. Поэтому, если у вас есть 1 экземпляр вашего класса, который вызывает n раз его метод, члены могут исчезнуть между двумя запусками, и вы можете получить сообщение malloc: *** error for object 0x...: pointer being freed was not allocated(например, открытый файл участника) или pure virtual method called, terminate called without an active exception(что означает, что время жизни объекта-члена, которое я использовал, было короче, чем что я думал). Я получил это при работе с n больше, чем размер пула. Вот краткий пример:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Вывод:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__Метод не такой эквивалент, потому что [Нет, ...] не считываются из результатов:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Так что ни один из обоих методов не удовлетворяет ...

Эрик Х.
источник
7
Вы Noneвернетесь, потому что в вашем определении __call__отсутствует return: так и должно быть return self.process_obj(i).
Торек
1
@Eric Я получил ту же ошибку и попробовал это решение, однако я начал получать новую ошибку как «cPickle.PicklingError: Can't pickle <type 'function'>: встроенный поиск атрибутов .function не выполнен». Вы знаете, что может быть причиной этого?
Наман
15

Есть еще один ярлык, который вы можете использовать, хотя он может быть неэффективным в зависимости от того, что находится в вашем классе.

Как все говорили, проблема в том, что multiprocessingкод должен перебирать вещи, которые он отправляет запущенным подпроцессам, а средство выбора не выполняет методы экземпляра.

Однако вместо отправки метода экземпляра вы можете отправить фактический экземпляр класса плюс имя вызываемой функции в обычную функцию, которая затем используется getattrдля вызова метода экземпляра, создавая связанный метод в Poolподпроцессе. Это похоже на определение __call__метода за исключением того, что вы можете вызывать более одной функции-члена.

Кража кода @ EricH. Из его ответа и его комментирование (я набрал его заново, поэтому все имена меняются и тому подобное, по какой-то причине это казалось проще, чем вырезать и вставить :-)) для иллюстрации всей магии:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

Выходные данные показывают, что действительно, конструктор вызывается один раз (в исходном pid), а деструктор вызывается 9 раз (один раз для каждой сделанной копии = 2 или 3 раза за пул-рабочий процесс, в зависимости от необходимости, плюс один раз в оригинале обработать). Это часто нормально, как в этом случае, так как сборщик по умолчанию делает копию всего экземпляра и (частично) тайно повторно заполняет его - в этом случае делает:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

Вот почему, несмотря на то, что деструктор вызывается восемь раз в трех рабочих процессах, он ведет отсчет от 1 до 0 каждый раз, но, конечно, вы все равно можете столкнуться с неприятностями. При необходимости вы можете предоставить свой собственный__setstate__ :

    def __setstate__(self, adict):
        self.count = adict['count']

в этом случае, например.

Торек
источник
1
Это, безусловно, лучший ответ для этой проблемы, поскольку его проще всего применить к поведению по умолчанию, не допускающему рассол
Мэтт Тейлор,
12

Вы также можете определить __call__()внутри себя метод someClass(), который вызывает, someClass.go()а затем передает экземпляр someClass()в пул. Этот объект является маринованным, и он прекрасно работает (для меня) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
parisjohn
источник
3

Решение от parisjohn выше прекрасно работает со мной. Плюс код выглядит чистым и легким для понимания. В моем случае есть несколько функций для вызова с помощью Pool, поэтому я изменил код parisjohn чуть ниже. Я сделал вызов, чтобы иметь возможность вызывать несколько функций, и имена функций передаются в аргументе dict из go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
neobot
источник
1

Потенциально тривиальным решением этого является переход на использование multiprocessing.dummy . Это основанная на потоках реализация многопроцессорного интерфейса, которая, похоже, не имеет этой проблемы в Python 2.7. У меня нет большого опыта здесь, но это быстрое изменение импорта позволило мне вызвать apply_async для метода класса.

Несколько хороших ресурсов по multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

Дэвид Паркс
источник
1

В этом простом случае, когда someClass.fнет наследования каких-либо данных из класса и не прикрепления чего-либо к классу, возможное решение будет состоять в том, чтобы отделить его f, чтобы его можно было засолить:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
MHH
источник
1

Почему бы не использовать отдельный func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
источник
1

Я столкнулся с этой же проблемой, но обнаружил, что есть кодировщик JSON, который можно использовать для перемещения этих объектов между процессами.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Используйте это, чтобы создать свой список:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Затем в отображенной функции используйте это для восстановления объекта:

pfVmomiObj = json.loads(jsonSerialized)
Джордж
источник
0

Обновление: по состоянию на день написания, namedTuples можно выбирать (начиная с Python 2.7)

Проблема здесь в том, что дочерние процессы не могут импортировать класс объекта - в этом случае, класс P-, в случае многомодельного проекта класс P должен импортироваться везде, где используется дочерний процесс

быстрый обходной путь - сделать его импортируемым, воздействуя на глобальные переменные ()

globals()["P"] = P
Рахид эль Кедмири
источник