Как я могу восстановить возвращаемое значение функции, переданной multiprocessing.Process?

190

В приведенном ниже примере кода я хотел бы восстановить возвращаемое значение функции worker. Как я могу сделать это? Где хранится это значение?

Пример кода:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Вывод:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Я не могу найти соответствующий атрибут в объектах, хранящихся в jobs.

БИК
источник

Ответы:

190

Используйте общую переменную для общения. Например, вот так:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
Vartec
источник
46
Я бы рекомендовал использовать multiprocessing.Queue, а не Managerздесь. Использование Managerтребует запуска совершенно нового процесса, который излишним, когда Queueбы сделал.
Дан
1
@dano: Интересно, если мы используем объект Queue (), мы не можем определить порядок, когда каждый процесс возвращает значение. Я имею в виду, если нам нужен порядок в результате, чтобы сделать следующую работу. Как мы можем быть уверены, где именно, какой вывод, из какого процесса
Catbuilts
4
@Catbuilts Вы можете вернуть кортеж из каждого процесса, где одно значение является фактическим возвращаемым значением, о котором вы заботитесь, а другое - уникальным идентификатором из процесса. Но мне также интересно, почему вы должны знать, какой процесс возвращает какое значение. Если это то, что вам на самом деле нужно знать о процессе, или вам нужно соотнести ваш список входов и список выходов? В этом случае я бы рекомендовал использовать multiprocessing.Pool.mapдля обработки вашего списка рабочих элементов.
Дано
5
предостережения для функций только с одним аргументом : следует использовать args=(my_function_argument, ). Запишите ,здесь запятую! Или же Python будет жаловаться на «отсутствие позиционных аргументов». Мне понадобилось 10 минут, чтобы понять. Также проверьте использование вручную (в разделе «класс процесса»).
yuqli
2
@vartec Один из недостатков использования словаря multipriocessing.Manager () заключается в том, что он обрабатывает (сериализует) возвращаемый объект, поэтому у него есть узкое место, заданное библиотекой pickle максимального размера 2 ГБ для возврата объекта. Есть ли какой-нибудь другой способ избежать сериализации возвращаемого объекта?
Hirschme
68

Я думаю, что подход, предложенный @sega_sai, является лучшим. Но это действительно нуждается в примере кода, так что здесь идет:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Который будет печатать возвращаемые значения:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Если вы знакомы с map(встроенным в Python 2), это не должно быть слишком сложным. В противном случае взгляните на ссылку sega_Sai .

Обратите внимание, как мало кода требуется. (Также обратите внимание, как процессы используются повторно).

отметка
источник
1
Есть идеи, почему мое getpid()возвращение все равно стоит? Я бегу Python3
zelusp
Я не уверен, как Пул распределяет задачи по рабочим. Может быть, они все могут оказаться на одном работнике, если они действительно быстрые? Это происходит последовательно? Также, если вы добавите задержку?
Марк
Я также думал, что это было связано со скоростью, но когда я кормлю pool.mapдиапазон 1 000 000, используя более 10 процессов, я вижу самое большее два разных пида.
zelusp
1
Тогда я не уверен. Думаю, было бы интересно открыть для этого отдельный вопрос.
Марк
Если вещи, которые вы хотите отправить по-разному для каждого процесса, используйте pool.apply_async: docs.python.org/3/library/…
Kyle
24

В этом примере показано, как использовать список экземпляров multiprocessing.Pipe для возврата строк из произвольного числа процессов:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Вывод:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Это решение использует меньше ресурсов, чем многопроцессорное. Вопрос, который использует

  • труба
  • хотя бы один замок
  • буфер
  • нить

или мультипроцессинг. Простая проблема, которая использует

  • труба
  • хотя бы один замок

Очень поучительно посмотреть на источник для каждого из этих типов.

Дэвид Каллен
источник
Что было бы лучшим способом сделать это, не делая каналы глобальной переменной?
Nickpick
Я помещаю все глобальные данные и код в основную функцию, и она работает так же. Это отвечает на ваш вопрос?
Дэвид Каллен
всегда ли должен читаться канал, прежде чем к нему можно будет добавить (отправить) новое значение?
Nickpick
+1, хороший ответ. Но из-за того, что решение более эффективно, компромисс заключается в том, что вы делаете один Pipeна процесс против одного Queueдля всех процессов. Я не знаю, будет ли это более эффективным во всех случаях.
Судо
2
Этот ответ вызывает тупик, если возвращаемый объект большой. Вместо того, чтобы сначала выполнять proc.join (), я сначала попытался бы получить возвращаемое значение recv (), а затем выполнить соединение.
Л. Пес
22

По какой-то причине я не смог найти общий пример того, как это сделать Queueгде угодно (даже примеры документов Python не порождают несколько процессов), поэтому вот что я получил после 10 попыток:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queueблокирующая потокобезопасная очередь, которую можно использовать для хранения возвращаемых значений дочерних процессов. Таким образом, вы должны передать очередь каждому процессу. Что менее очевидно , является то , что вы должны get()из очереди перед вами joinв Processэс или иначе очередь заполняет и блокирует все.

Обновление для тех, кто является объектно-ориентированным (протестировано в Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
Судо
источник
18

Для тех, кто ищет, как получить ценность от Processиспользования Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Мэтью Мойзен
источник
1
когда я помещаю что-то в очередь в моем рабочем процессе, мое соединение никогда не достигается. Есть идеи, как это могло произойти?
Лорен Коппенол
@LaurensKoppenol Вы имеете в виду, что ваш основной код постоянно висит в p.join () и никогда не продолжается? Ваш процесс имеет бесконечный цикл?
Мэтью Мойзен
4
Да, он висит там бесконечно. Все мои работники заканчивают работу (цикл внутри рабочей функции завершается, после этого печатается оператор print для всех работников). Объединение ничего не делает. Если я удалю Queueиз своей функции, это позволит мне пройтиjoin()
Лорен Коппенол
@ LaurensKoppenol Вы, возможно, не звоните queue.put(ret)до звонка p.start()? В этом случае рабочий поток будет зависать queue.get()вечно. Вы можете повторить это, скопировав мой фрагмент выше при комментировании queue.put(ret).
Мэтью Мойзен
Я отредактировал этот ответ, queue.get()должно произойти до p.join(). Это работает сейчас для меня.
Jfunk
12

Кажется, что вы должны использовать вместо этого класс multiprocessing.Pool и использовать методы .apply () .apply_async (), map ()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

sega_sai
источник
У меня есть код тензорного потока, для которого многопроцессорная обработка. Пул будет зависать, но не многопроцессорная обработка.
Процесс
10

Вы можете использовать exitвстроенный, чтобы установить код выхода процесса. Его можно получить из exitcodeатрибута процесса:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Вывод:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Дэвид Каллен
источник
4
Имейте в виду, что такой подход может привести к путанице. Обычно процессы должны завершаться с кодом завершения 0, если они завершены без ошибок. Если у вас есть что-нибудь, отслеживающее коды завершения процесса вашей системы, вы можете увидеть эти сообщения как ошибки.
колесо обозрения
1
Идеально, если вы просто хотите вызвать исключение в родительском процессе при ошибке.
crizCraig
6

Пакет pebble имеет хорошее использование абстракции, multiprocessing.Pipeчто делает его довольно простым:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Пример из: https://pythonhosted.org/Pebble/#concurrent-decorators

erikreed
источник
3

Думаю, я упростил бы самые простые примеры, скопированные сверху, работая для меня на Py3.6. Самый простой это multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Вы можете установить количество процессов в бассейне с, например, Pool(processes=5). Тем не менее, по умолчанию используется счетчик ЦП, поэтому оставьте его пустым для задач, связанных с ЦП. (Задачи с привязкой к вводу / выводу часто все равно подходят потокам, так как потоки в основном ожидают, поэтому могут совместно использовать ядро ​​ЦП.) PoolТакже применяется оптимизация разбиения на блоки .

(Обратите внимание, что рабочий метод не может быть вложен в метод. Сначала я определил свой рабочий метод внутри метода, который выполняет вызов pool.map, чтобы он оставался автономным, но затем процессы не смогли его импортировать, и выдал «AttributeError». : Не могу выбрать локальный объект external_method..inner_method ". Подробнее здесь . Может быть внутри класса.)

(Оцените оригинальный вопрос, заданный печатью, 'represent!'а не time.sleep(), но без него я думал, что некоторый код выполнялся одновременно, когда это не так.)


Py3 также ProcessPoolExecutorсостоит из двух строк ( .mapвозвращает генератор, поэтому вам нужно list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

С простым Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Используйте, SimpleQueueесли все, что вам нужно, это putи get. Первый цикл запускает все процессы, прежде чем второй выполняет блокирующие queue.getвызовы. Я не думаю, что есть причина звонить p.join()тоже.

Крис
источник
2

Простое решение:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Вывод:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Rubens_Zimbres
источник
2

Если вы используете Python 3, вы можете использовать concurrent.futures.ProcessPoolExecutorв качестве удобной абстракции:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Вывод:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Алеф Алеф
источник
0

Я немного изменил ответ vartec, так как мне нужно было получить коды ошибок из функции. (Спасибо Vertec !!! это удивительный трюк)

Это также может быть сделано с помощью, manager.listно я думаю, что лучше иметь это в dict и хранить список в нем. Таким образом, мы сохраняем функцию и результаты, поскольку не можем быть уверены в том, в каком порядке будет заполняться список.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
Pelos
источник