Можно ли создать пул Python, который не является демоническим? Я хочу, чтобы пул мог вызывать функцию, внутри которой есть другой пул.
Я хочу этого, потому что процессы deamon не могут создавать процесс. В частности, это вызовет ошибку:
AssertionError: daemonic processes are not allowed to have children
Например, рассмотрим сценарий, в котором function_a
есть пул, который работает function_b
, и пул, который работает function_c
. Эта цепочка функций завершится ошибкой, потому что function_b
она выполняется в процессе демона, а процессы демона не могут создавать процессы.
python
multiprocessing
pool
Максимум
источник
источник
I want a pool to be able to call a function that has another pool inside
и как это мешает тому факту, что рабочие демонизированы.AssertionError: daemonic processes are not allowed to have children
Ответы:
multiprocessing.pool.Pool
Класс создает рабочие процессы в его__init__
методе, делает их демонами и запускает их, и это не возможно повторно установить свойdaemon
атрибут ,False
прежде чем они начали (и после этого он не имеет больше). Но вы можете создать свой собственный подклассmultiprocesing.pool.Pool
(multiprocessing.Pool
это просто функция-оболочка) и заменить свой собственныйmultiprocessing.Process
подкласс, который всегда не является демоническим, для использования в рабочих процессах.Вот полный пример того, как это сделать. Важными частями являются два класса
NoDaemonProcess
иMyPool
вверху, а также для вызоваpool.close()
иpool.join()
вашегоMyPool
экземпляра в конце.#!/usr/bin/env python # -*- coding: UTF-8 -*- import multiprocessing # We must import this explicitly, it is not imported by the top-level # multiprocessing module. import multiprocessing.pool import time from random import randint class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): print("Creating %i (daemon) workers and jobs in child." % num_procs) pool = multiprocessing.Pool(num_procs) result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) # The following is not really needed, since the (daemon) workers of the # child's pool are killed when the child is terminated, but it's good # practice to cleanup after ourselves anyway. pool.close() pool.join() return result def test(): print("Creating 5 (non-daemon) workers and jobs in main process.") pool = MyPool(5) result = pool.map(work, [randint(1, 5) for x in range(5)]) pool.close() pool.join() print(result) if __name__ == '__main__': test()
источник
multiprocessing.freeze_support()
MyPool
вместо стандартногоPool
? Другими словами, какие затраты я оплачиваю в обмен на гибкость запуска дочерних процессов? (Если бы не было затрат, по-видимому, стандарт использовалPool
бы недемонические процессы).Pool
класс был значительно переработан, поэтомуProcess
это уже не простой атрибут, а метод, который возвращает экземпляр процесса, который он получает из контекста . Я попытался перезаписать этот метод, чтобы вернутьNoDaemonPool
экземпляр, но это привело к исключениюAssertionError: daemonic processes are not allowed to have children
при использовании пула.У меня была необходимость использовать недемонический пул в Python 3.7, и в итоге я адаптировал код, опубликованный в принятом ответе. Ниже приведен фрагмент, который создает недемонический пул:
import multiprocessing.pool class NoDaemonProcess(multiprocessing.Process): @property def daemon(self): return False @daemon.setter def daemon(self, value): pass class NoDaemonContext(type(multiprocessing.get_context())): Process = NoDaemonProcess # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class NestablePool(multiprocessing.pool.Pool): def __init__(self, *args, **kwargs): kwargs['context'] = NoDaemonContext() super(NestablePool, self).__init__(*args, **kwargs)
Поскольку текущая реализация
multiprocessing
была тщательно переработана с учетом контекстов, нам необходимо предоставитьNoDaemonContext
класс с нашимNoDaemonProcess
атрибутом as.NestablePool
затем будет использовать этот контекст вместо контекста по умолчанию.Тем не менее, я должен предупредить, что у этого подхода есть как минимум два предостережения:
multiprocessing
пакета и поэтому может сломаться в любой момент.multiprocessing
было так сложно использовать недемонические процессы, многие из которых описаны здесь . На мой взгляд, наиболее убедительными являются:источник
Модуль многопроцессорности имеет приятный интерфейс для использования пулов с процессами или потоками. В зависимости от вашего текущего варианта использования вы можете рассмотреть возможность использования
multiprocessing.pool.ThreadPool
для своего внешнего пула, что приведет к потокам (которые позволяют запускать процессы изнутри) а не к процессам.Это может быть ограничено GIL, но в моем конкретном случае (я тестировал оба) время запуска процессов из внешнего,
Pool
созданного здесь, намного перевешивает решение сThreadPool
.Это действительно легко своп
Processes
дляThreads
. Узнайте больше о том, как использоватьThreadPool
решение здесь или здесь .источник
В некоторых версиях Python замена стандартного Pool на пользовательский может вызвать ошибку:
AssertionError: group argument must be None for now
.Здесь я нашел решение, которое может помочь:
class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False @property def daemon(self): return False @daemon.setter def daemon(self, val): pass class NoDaemonProcessPool(multiprocessing.pool.Pool): def Process(self, *args, **kwds): proc = super(NoDaemonProcessPool, self).Process(*args, **kwds) proc.__class__ = NoDaemonProcess return proc
источник
concurrent.futures.ProcessPoolExecutor
не имеет этого ограничения. Он может иметь пул вложенных процессов без каких-либо проблем:from concurrent.futures import ProcessPoolExecutor as Pool from itertools import repeat from multiprocessing import current_process import time def pid(): return current_process().pid def _square(i): # Runs in inner_pool square = i ** 2 time.sleep(i / 10) print(f'{pid()=} {i=} {square=}') return square def _sum_squares(i, j): # Runs in outer_pool with Pool(max_workers=2) as inner_pool: squares = inner_pool.map(_square, (i, j)) sum_squares = sum(squares) time.sleep(sum_squares ** .5) print(f'{pid()=}, {i=}, {j=} {sum_squares=}') return sum_squares def main(): with Pool(max_workers=3) as outer_pool: for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)): print(f'{pid()=} {sum_squares=}') if __name__ == "__main__": main()
Приведенный выше демонстрационный код был протестирован с Python 3.8.
Однако ограничение в том
ProcessPoolExecutor
, что у него нетmaxtasksperchild
. Если вам это нужно, вместо этого рассмотрите ответ Массимилиано .Кредит: ответ jfs
источник
multiprocessing.Pool
внутри aProcessPoolExecutor.Pool
!Проблема, с которой я столкнулся, заключалась в попытке импортировать глобальные переменные между модулями, в результате чего строка ProcessPool () оценивалась несколько раз.
globals.py
from processing import Manager, Lock from pathos.multiprocessing import ProcessPool from pathos.threading import ThreadPool class SingletonMeta(type): def __new__(cls, name, bases, dict): dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self return super(SingletonMeta, cls).__new__(cls, name, bases, dict) def __init__(cls, name, bases, dict): super(SingletonMeta, cls).__init__(name, bases, dict) cls.instance = None def __call__(cls,*args,**kw): if cls.instance is None: cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) return cls.instance def __deepcopy__(self, item): return item.__class__.instance class Globals(object): __metaclass__ = SingletonMeta """ This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children The root cause is that importing this file from different modules causes this file to be reevalutated each time, thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug """ def __init__(self): print "%s::__init__()" % (self.__class__.__name__) self.shared_manager = Manager() self.shared_process_pool = ProcessPool() self.shared_thread_pool = ThreadPool() self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
Затем безопасно импортируйте из другого места в вашем коде
from globals import Globals Globals().shared_manager Globals().shared_process_pool Globals().shared_thread_pool Globals().shared_lock
Я написал здесь более расширенный класс-оболочку
pathos.multiprocessing
:В качестве побочного примечания, если ваш вариант использования просто требует асинхронной многопроцессорной карты в качестве оптимизации производительности, тогда joblib будет управлять всеми вашими пулами процессов за кулисами и разрешить этот очень простой синтаксис:
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
источник
Я видел людей , занимающихся этой проблемой, используя
celery
вилку «s изmultiprocessing
под названием бильярда (многопроцессорных расширения пула), что позволяет Daemonic процессы нереститься детей. Решение - просто заменитьmultiprocessing
модуль на:import billiard as multiprocessing
источник
Это представляет собой обходной путь, когда ошибка кажется ложноположительной. Как также отметил Джеймс , это может произойти при непреднамеренном импорте из демонического процесса.
Например, если у вас есть следующий простой код, он
WORKER_POOL
может быть случайно импортирован из рабочего, что приведет к ошибке.import multiprocessing WORKER_POOL = multiprocessing.Pool()
Простой, но надежный способ обходного пути:
import multiprocessing import multiprocessing.pool class MyClass: @property def worker_pool(self) -> multiprocessing.pool.Pool: # Ref: https://stackoverflow.com/a/63984747/ try: return self._worker_pool # type: ignore except AttributeError: # pylint: disable=protected-access self.__class__._worker_pool = multiprocessing.Pool() # type: ignore return self.__class__._worker_pool # type: ignore # pylint: enable=protected-access
В указанном выше обходном пути
MyClass.worker_pool
можно использовать без ошибки. Если вы думаете, что этот подход можно улучшить, дайте мне знать.источник