В документации к multiprocessing
модулю показано, как передать очередь процессу, запущенному с multiprocessing.Process
. Но как я могу разделить очередь с запущенными асинхронными рабочими процессами apply_async
? Мне не нужно динамическое объединение или что-то еще, просто способ для рабочих (многократно) сообщать о своих результатах обратно на базу.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Это терпит неудачу с:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Я понимаю, что это означает, и я понимаю совет наследовать, а не требовать травления / удаления (и все специальные ограничения Windows). Но как же я прохожу очереди таким образом , что работает? Я не могу найти пример и пробовал несколько альтернатив, которые по разным причинам не увенчались успехом. Помогите, пожалуйста?
queue.Queue()
для этого не подходит?queue.Queue
был построен для потоковой передачи с использованием блокировок в памяти. В многопроцессорной среде каждый подпроцесс получит свою собственную копиюqueue.Queue()
экземпляра в своем собственном пространстве памяти, поскольку подпроцессы не разделяют память (в основном).multiprocessing.Pool
уже имеет общую очередь результатов, нет необходимости дополнительно задействоватьManager.Queue
.Manager.Queue
представляет собойqueue.Queue
(многопоточную очередь) под капотом, расположенную на отдельном серверном процессе и доступную через прокси. Это добавляет дополнительные накладные расходы по сравнению с внутренней очередью Pool. В отличие от использования встроенной обработки результатов Pool, порядок результатов вManager.Queue
также не гарантируется.Рабочие процессы не запускаются
.apply_async()
, это уже происходит при создании экземпляраPool
. Что это началось , когда вы звонитеpool.apply_async()
новая «работа». Рабочие процессы пула запускаютmultiprocessing.pool.worker
функцию под капотом. Эта функция заботится об обработке новых «задач», передаваемых через внутренний пул,Pool._inqueue
и об отправке результатов обратно родительскому объекту черезPool._outqueue
. Указанноеfunc
вами будет выполнено в пределахmultiprocessing.pool.worker
.func
нужно толькоreturn
что-то, и результат будет автоматически отправлен обратно родителю..apply_async()
немедленно (асинхронно) возвращаетAsyncResult
объект (псевдоним дляApplyResult
). Вам нужно вызвать.get()
(блокирует) этот объект, чтобы получить фактический результат. Другой вариант - зарегистрировать функцию обратного вызова , которая запускается, как только становится готов результат.from multiprocessing import Pool def busy_foo(i): """Dummy function simulating cpu-bound work.""" for _ in range(int(10e6)): # do stuff pass return i if __name__ == '__main__': with Pool(4) as pool: print(pool._outqueue) # DEMO results = [pool.apply_async(busy_foo, (i,)) for i in range(10)] # `.apply_async()` immediately returns AsyncResult (ApplyResult) object print(results[0]) # DEMO results = [res.get() for res in results] print(f'result: {results}')
Пример вывода:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0> <multiprocessing.pool.ApplyResult object at 0x7fa12586da20> result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Примечание. Указание параметра
timeout
-параметра для.get()
не остановит фактическую обработку задачи внутри рабочего процесса, а только разблокирует ожидающего родителя, поднявmultiprocessing.TimeoutError
.источник
error_callback
-параметр дляapply_async
, поэтому с тех пор он не сильно изменился.