Python Process Pool недемонический?

102

Можно ли создать пул Python, который не является демоническим? Я хочу, чтобы пул мог вызывать функцию, внутри которой есть другой пул.

Я хочу этого, потому что процессы deamon не могут создавать процесс. В частности, это вызовет ошибку:

AssertionError: daemonic processes are not allowed to have children

Например, рассмотрим сценарий, в котором function_aесть пул, который работает function_b, и пул, который работает function_c. Эта цепочка функций завершится ошибкой, потому что function_bона выполняется в процессе демона, а процессы демона не могут создавать процессы.

Максимум
источник
AFAIK, нет, невозможно, что все рабочие в пуле демонизированы, и невозможно ввести зависимость , BTW, я не понимаю вторую часть вашего вопроса I want a pool to be able to call a function that has another pool insideи как это мешает тому факту, что рабочие демонизированы.
mouad
4
Поскольку, если у функции a есть пул, который запускает функцию b, у которого есть пул, который запускает функцию c, проблема в b заключается в том, что она запускается в процессе демона, а процессы демона не могут создавать процессы. AssertionError: daemonic processes are not allowed to have children
Макс

Ответы:

122

multiprocessing.pool.PoolКласс создает рабочие процессы в его __init__методе, делает их демонами и запускает их, и это не возможно повторно установить свой daemonатрибут , Falseпрежде чем они начали (и после этого он не имеет больше). Но вы можете создать свой собственный подкласс multiprocesing.pool.Pool( multiprocessing.Poolэто просто функция-оболочка) и заменить свой собственный multiprocessing.Processподкласс, который всегда не является демоническим, для использования в рабочих процессах.

Вот полный пример того, как это сделать. Важными частями являются два класса NoDaemonProcessи MyPoolвверху, а также для вызова pool.close()и pool.join()вашего MyPoolэкземпляра в конце.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Крис Арндт
источник
1
Я только что снова протестировал свой код с Python 2.7 / 3.2 (после исправления строк "print") в Linux и Python 2.6 / 2.7 / 3.2 OS X. Linux и Python 2.7 / 3.2 в OS X работают нормально, но код действительно зависает с Python 2.6 для OS X (Lion). Похоже, это ошибка в модуле многопроцессорности, которая была исправлена, но я еще не проверял трекер ошибок.
Крис Арндт,
1
Благодарность! На windows тоже нужно звонитьmultiprocessing.freeze_support()
frmdstryr 04
2
Хорошая работа. Если у кого-то возникает утечка памяти при этом, попробуйте использовать «с закрытием (MyPool (process = num_cpu)) as pool:» для правильной утилизации пула
Крис Люсьен
32
Какие недостатки использования MyPoolвместо стандартного Pool? Другими словами, какие затраты я оплачиваю в обмен на гибкость запуска дочерних процессов? (Если бы не было затрат, по-видимому, стандарт использовал Poolбы недемонические процессы).
максимум
4
@machen Да, к сожалению, это правда. В Python 3.6 Poolкласс был значительно переработан, поэтому Processэто уже не простой атрибут, а метод, который возвращает экземпляр процесса, который он получает из контекста . Я попытался перезаписать этот метод, чтобы вернуть NoDaemonPoolэкземпляр, но это привело к исключению AssertionError: daemonic processes are not allowed to have childrenпри использовании пула.
Крис Арндт,
29

У меня была необходимость использовать недемонический пул в Python 3.7, и в итоге я адаптировал код, опубликованный в принятом ответе. Ниже приведен фрагмент, который создает недемонический пул:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

Поскольку текущая реализация multiprocessingбыла тщательно переработана с учетом контекстов, нам необходимо предоставить NoDaemonContextкласс с нашим NoDaemonProcessатрибутом as. NestablePoolзатем будет использовать этот контекст вместо контекста по умолчанию.

Тем не менее, я должен предупредить, что у этого подхода есть как минимум два предостережения:

  1. Это все еще зависит от деталей реализации multiprocessingпакета и поэтому может сломаться в любой момент.
  2. Существуют веские причины, по которым multiprocessingбыло так сложно использовать недемонические процессы, многие из которых описаны здесь . На мой взгляд, наиболее убедительными являются:

Что касается разрешения дочерним потокам порождать собственных дочерних процессов, с помощью подпроцесса возникает риск создания небольшой армии зомби-внуков, если родительский или дочерний поток завершается до того, как подпроцесс завершится и вернется.

Массимилиано
источник
1
Относительно предостережения: мой вариант использования - распараллеливание задач, но внуки возвращают информацию своим родителям, которые, в свою очередь, возвращают информацию своим родителям после выполнения некоторой необходимой локальной обработки. Следовательно, каждый уровень / ветвь имеет явное ожидание всех своих листьев. Остается ли предупреждение, если вам явно нужно дождаться завершения порожденных процессов?
A_A
Не могли бы вы добавить, как использовать это вместо multiprocessing.pool?
радио,
«Теперь вы можете использовать multiprocessing.Pool и NestablePool как взаимозаменяемые».
Радиоуправляемый
22

Модуль многопроцессорности имеет приятный интерфейс для использования пулов с процессами или потоками. В зависимости от вашего текущего варианта использования вы можете рассмотреть возможность использования multiprocessing.pool.ThreadPoolдля своего внешнего пула, что приведет к потокам (которые позволяют запускать процессы изнутри) а не к процессам.

Это может быть ограничено GIL, но в моем конкретном случае (я тестировал оба) время запуска процессов из внешнего, Poolсозданного здесь, намного перевешивает решение с ThreadPool.


Это действительно легко своп Processesдля Threads. Узнайте больше о том, как использовать ThreadPoolрешение здесь или здесь .

Timmwagener
источник
Спасибо - это мне очень помогло - отличное использование потоковой передачи (для создания процессов, которые действительно работают хорошо)
trance_dude
1
Для людей, которые ищут практическое решение, которое, вероятно, применимо к их ситуации, это то, что нужно.
абанана
6

В некоторых версиях Python замена стандартного Pool на пользовательский может вызвать ошибку: AssertionError: group argument must be None for now .

Здесь я нашел решение, которое может помочь:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Atterratio
источник
4

concurrent.futures.ProcessPoolExecutorне имеет этого ограничения. Он может иметь пул вложенных процессов без каких-либо проблем:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

Приведенный выше демонстрационный код был протестирован с Python 3.8.

Однако ограничение в том ProcessPoolExecutor, что у него нет maxtasksperchild. Если вам это нужно, вместо этого рассмотрите ответ Массимилиано .

Кредит: ответ jfs

Acumenus
источник
1
Сейчас это явно лучшее решение, поскольку требует минимальных изменений.
DreamFlasher
1
работает отлично! ... в качестве дополнительной заметки также возможно использование ребенка multiprocessing.Poolвнутри a ProcessPoolExecutor.Pool!
Рафаэль
4

Проблема, с которой я столкнулся, заключалась в попытке импортировать глобальные переменные между модулями, в результате чего строка ProcessPool () оценивалась несколько раз.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Затем безопасно импортируйте из другого места в вашем коде

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

Я написал здесь более расширенный класс-оболочку pathos.multiprocessing:

В качестве побочного примечания, если ваш вариант использования просто требует асинхронной многопроцессорной карты в качестве оптимизации производительности, тогда joblib будет управлять всеми вашими пулами процессов за кулисами и разрешить этот очень простой синтаксис:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
Джеймс Макгиган
источник
3

Я видел людей , занимающихся этой проблемой, используя celeryвилку «s из multiprocessingпод названием бильярда (многопроцессорных расширения пула), что позволяет Daemonic процессы нереститься детей. Решение - просто заменить multiprocessingмодуль на:

import billiard as multiprocessing
Томаш Бартковяк
источник
0

Это представляет собой обходной путь, когда ошибка кажется ложноположительной. Как также отметил Джеймс , это может произойти при непреднамеренном импорте из демонического процесса.

Например, если у вас есть следующий простой код, он WORKER_POOLможет быть случайно импортирован из рабочего, что приведет к ошибке.

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

Простой, но надежный способ обходного пути:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

В указанном выше обходном пути MyClass.worker_poolможно использовать без ошибки. Если вы думаете, что этот подход можно улучшить, дайте мне знать.

Acumenus
источник