Python многопроцессорный pool.map для нескольких аргументов

536

В многопроцессорной библиотеке Python есть вариант pool.map, который поддерживает несколько аргументов?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
user642897
источник
4
К моему удивлению, я не мог сделать ни , partialни lambdaсделать это. Я думаю, что это связано со странным способом, которым функции передаются подпроцессам (через pickle).
senderle
10
@senderle: это ошибка в Python 2.6, но она была исправлена ​​с 2.7: bugs.python.org/issue5228
unutbu
1
Просто замените pool.map(harvester(text,case),case, 1) на: pool.apply_async(harvester(text,case),case, 1)
Тунг Нгуен
3
@Syrtis_Major, пожалуйста, не редактируйте вопросы OP, которые эффективно искажают ответы, которые были даны ранее. Добавление returnк harvester()тому, что ответ @senderie оказался неточным. Это не поможет будущим читателям.
Рикалсин
1
Я бы сказал, что простым решением было бы упаковать все аргументы в кортеж и распаковать его в исполняющую функцию. Я сделал это, когда мне нужно было отправить несколько сложных аргументов в функцию, выполняемую пулом процессов.
HS

Ответы:

358

Ответ на это зависит от версии и ситуации. Наиболее общий ответ для последних версий Python (начиная с версии 3.3) был впервые описан JF Sebastian . 1 Используется Pool.starmapметод, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их данной функции:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Для более ранних версий Python вам нужно написать вспомогательную функцию для явной распаковки аргументов. Если вы хотите использовать with, вам также нужно написать оболочку, чтобы превратиться Poolв менеджер контекста. (Спасибо Мюону за указание на это.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

В более простых случаях вы можете использовать фиксированный второй аргумент partial, но только в Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Во многом это было вдохновлено его ответом, который, вероятно, следовало бы принять вместо этого. Но так как этот вариант застрял на вершине, лучше всего его улучшить для будущих читателей.

senderle
источник
Мне кажется, что RAW_DATASET в этом случае должна быть глобальной переменной? Хотя я хочу, чтобы частичный_харвестер изменял значение регистра при каждом вызове харвестера (). Как этого добиться?
xgdgsc
Здесь самое важное - присвоить =RAW_DATASETзначение по умолчанию case. В противном случае pool.mapбудет путать насчет нескольких аргументов.
Эмерсон Сюй
1
Я запутался, что случилось с textпеременной в вашем примере? Почему, RAW_DATASETказалось бы, прошло два раза. Я думаю, что у вас может быть опечатка?
Дейв
не знаю, почему использование with .. as .. дает мне AttributeError: __exit__, но работает хорошо, если я просто позвонить, а pool = Pool();затем закрыть вручную pool.close()(python2.7)
мюон
1
@ muon, хороший улов. Похоже, Poolобъекты не становятся менеджерами контекста до Python 3.3. Я добавил простую функцию-обертку, которая возвращает Poolменеджер контекста.
senderle
501

есть ли вариант pool.map, который поддерживает несколько аргументов?

Python 3.3 включает в себя pool.starmap()метод :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Для более старых версий:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Вывод

1 1
2 1
3 1

Обратите внимание, как itertools.izip()и itertools.repeat()здесь используются.

Из- за ошибки, упомянутой @unutbu, вы не можете использовать functools.partial()или подобные возможности в Python 2.6, поэтому простая функция-обертка func_star()должна быть определена явно. Смотрите также обходной путь, предложенныйuptimebox .

JFS
источник
1
F .: Вы можете распаковать аргумент кортеж в подписи , func_starкак это: def func_star((a, b)). Конечно, это работает только для фиксированного числа аргументов, но если это единственный случай, который он имеет, он более читабелен.
Бьорн Поллекс
1
@ Space_C0wb0y: f((a,b))синтаксис устарел и удален в py3k. И это не нужно здесь.
Jfs
возможно, более питонический: func = lambda x: func(*x)вместо определения функции-оболочки
dylam
1
@ zthomas.nc этот вопрос о том, как поддерживать несколько аргументов для многопроцессорной обработки pool.map. Если вы хотите узнать, как вызывать метод вместо функции в другом процессе Python с помощью многопроцессорной обработки, задайте отдельный вопрос (если все остальное терпит неудачу, вы всегда можете создать глобальную функцию, которая переносит вызов метода, аналогичный func_star()описанному выше)
jfs
1
Я хотел бы, чтобы были starstarmap.
Константин Ван
141

Думаю ниже будет лучше

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

вывод

[3, 5, 7]
imotai
источник
16
Самое простое решение. Есть небольшая оптимизация; удалите функцию-обертку и распакуйте ее argsнапрямую add, она работает для любого количества аргументов:def add(args): (x,y) = args
Ахмед
1
Вы также можете использовать lambdaфункцию вместо определенияmulti_run_wrapper(..)
Андре Хольцнер,
2
хм ... на самом деле, использование lambdaне работает, потому что pool.map(..)пытается засолить данную функцию
Андре Хольцнер
Как вы используете это, если вы хотите сохранить результат addв списке?
Вивек Субраманян
@ Ахмед, мне нравится, как оно есть, потому что ИМХО вызов метода должен завершаться неудачно, когда число параметров неверно.
Майкл Дорнер
56

Использование Python 3.3+ сpool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Результат:

1 --- 4
2 --- 5
3 --- 6

Вы также можете zip () больше аргументов, если вам нравится: zip(a,b,c,d,e)

В случае, если вы хотите, чтобы в качестве аргумента передавалось постоянное значение, import itertoolsа затем, zip(itertools.repeat(constant), a)например.

user136036
источник
2
Это почти точный дубликат ответа от @JFSebastian в 2011 году (более 60 голосов).
Майк Маккернс,
29
Во-первых, он удалил много ненужных вещей и четко указал, что это для Python 3.3+ и предназначен для начинающих, которые ищут простой и чистый ответ. Самому начинающему потребовалось некоторое время, чтобы понять это таким образом (да, с постами JFSebastians), и именно поэтому я написал свой пост, чтобы помочь другим новичкам, потому что его пост просто сказал «есть карта звездного неба», но не объяснил это - это это то, что намерен мой пост. Так что нет абсолютно никакой причины избивать меня двумя отрицательными голосами.
user136036
В 2011 году не было "+" в питоне 3.3 + ... так очевидно.
Майк Маккернс
27

Узнав об itertools в ответе Дж.Ф. Себастьяна, я решил сделать еще один шаг и написать parmapпакет, который заботится о распараллеливании, предложении mapи starmapфункциях на python-2.7 и python-3.2 (и позже), которые могут принимать любое количество позиционных аргументов. ,

Установка

pip install parmap

Как распараллелить:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Я загрузил parmap в PyPI и в хранилище github .

В качестве примера на вопрос можно ответить следующим образом:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
zeehio
источник
20

# «Как принять несколько аргументов».

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Датчанин ли
источник
2
Аккуратный и элегантный.
Prav001
1
Я не понимаю, почему я должен прокручивать весь путь здесь, чтобы найти лучший ответ.
Тоти
12

Есть форк с multiprocessingназванием pathos ( примечание: используйте версию на github ), который не нужен starmap- функции карты отражают API для карты python, поэтому map может принимать несколько аргументов. С помощью pathosвы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застрять в __main__блоке. После небольшого обновления Pathos выйдет в свет - в основном это переход на python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathosЕсть несколько способов, которыми вы можете получить точное поведение starmap.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 
Майк Маккернс
источник
Я хочу отметить, что это не касается структуры в первоначальном вопросе. [[1,2,3], [4,5,6]] распаковывают со звездной картой в [pow (1,2,3), pow (4,5,6)], а не [pow (1,4) , pow (2,5), pow (3, 6)]. Если у вас нет хорошего контроля над входами, передаваемыми в вашу функцию, вам может понадобиться сначала их реструктурировать.
Скотт
@ Скотт: ах, я не заметил этого ... более 5 лет назад. Я сделаю небольшое обновление. Спасибо.
Майк Маккернс
8

Вы можете использовать следующие две функции, чтобы избежать написания оболочки для каждой новой функции:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Используйте функцию functionсо списками аргументов arg_0, arg_1и arg_2выглядит следующим образом :

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
М. Тойя
источник
8

Лучшее решение для python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

вне[]:

[3, 5, 7]

xmduhan
источник
7

Другой простой альтернативой является упаковка параметров вашей функции в кортеж, а затем упаковка параметров, которые также должны быть переданы в кортежи. Это, возможно, не идеально, когда имеешь дело с большими кусками данных. Я считаю, что это будет делать копии для каждого кортежа.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Дает вывод в некотором случайном порядке:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Алекс Клибиш
источник
Это действительно так, все еще в поисках лучшего пути :(
Фабио Диас
6

Лучше использовать декоратор вместо написания функции-оболочки вручную. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания оболочки для каждой функции. Обычно декорированная функция не является кражей, однако мы можем использовать ее functoolsдля обхода. Больше рассуждений можно найти здесь .

Вот пример

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Тогда вы можете сопоставить его с заархивированными аргументами

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Конечно, вы всегда можете использовать Pool.starmapв Python 3 (> = 3.3), как указано в других ответах.

Syrtis Major
источник
Результаты не такие, как ожидалось: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Я ожидаю: [0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
Тедо Врбанец
@TedoVrbanec Результаты просто должны быть [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]. Если вы хотите более поздний, вы можете использовать itertools.productвместо zip.
Syrtis Major
4

Другой способ - передать список списков подпрограмме с одним аргументом:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Можно создать список списков аргументов с помощью любимого метода.

саман
источник
Это простой способ, но вам нужно изменить свои оригинальные функции. Более того, когда-нибудь вспомните чужие функции, которые нельзя изменить.
WeizhongTu
Я скажу, что это придерживается Python Zen. Должен быть один-единственный очевидный способ сделать это. Если вы случайно являетесь автором вызывающей функции, вам следует использовать этот метод, в других случаях мы можем использовать метод imotai.
Нехем
Мой выбор - использовать кортеж, а затем сразу же развернуть его, как первое, что есть в первой строке.
Нехем
3

Вот еще один способ сделать это, что ИМХО является более простым и элегантным, чем любой из предоставленных ответов.

Эта программа имеет функцию, которая принимает два параметра, печатает их, а также печатает сумму:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

вывод:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Смотрите документацию по Python для получения дополнительной информации:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

В частности, обязательно ознакомьтесь с starmap функцию.

Я использую Python 3.6, я не уверен, будет ли это работать со старыми версиями Python

Почему в документах нет такого простого примера, как я, я не уверен.

cdahms
источник
2

В python 3.4.4 вы можете использовать multiprocessing.get_context (), чтобы получить объект контекста для использования нескольких методов запуска:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Или вы просто замените

pool.map(harvester(text,case),case, 1)

по:

pool.apply_async(harvester(text,case),case, 1)
Тунг Нгуен
источник
2

Здесь много ответов, но ни один из них не предоставляет Python 2/3-совместимый код, который будет работать на любой версии. Если вы хотите, чтобы ваш код просто работал , это будет работать для любой версии Python:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

После этого вы можете использовать многопроцессорную обработку обычным способом Python 3, как вам нравится. Например:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

будет работать в Python 2 или Python 3.

cgnorthcutt
источник
1

В официальной документации говорится, что она поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
roj4s
источник
1
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()
Jaime
источник
1

Это пример процедуры, которую я использую для передачи нескольких аргументов в функцию с одним аргументом, используемую в форке pool.imap :

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
А. Нодар
источник
-3

для python2 вы можете использовать этот трюк

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Гц Шан
источник
почему б = 233. побеждает цель вопроса
как - будто