Python: как я могу запускать функции Python параллельно?

111

Сначала я исследовал и не смог найти ответа на свой вопрос. Я пытаюсь запустить несколько функций параллельно в Python.

У меня примерно так:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или с одним и тем же объектом. Прямо сейчас я должен дождаться завершения func1 до запуска func2. Как мне сделать что-то вроде ниже:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Я хочу иметь возможность создавать оба каталога примерно в одно и то же время, потому что каждую минуту я считаю, сколько файлов создается. Если каталога нет, это сбивает мое время.

lmcadory
источник
1
Вы можете захотеть переделать это; если вы подсчитываете количество файлов / папок каждую минуту, вы создаете состояние гонки. Как насчет того, чтобы каждая функция обновляла счетчик или использовала файл блокировки, чтобы гарантировать, что периодический процесс не обновляет счетчик, пока обе функции не закончат выполнение?

Ответы:

166

Вы можете использовать threadingили multiprocessing.

Из - за особенности CPython , threadingвряд ли достигнут истинного параллелизма. По этой причине, multiprocessingкак правило, лучше.

Вот полный пример:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Механику запуска / присоединения дочерних процессов можно легко инкапсулировать в функцию в соответствии со строками вашего runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
источник
4
Я использовал ваш код, но функции все равно не запускались одновременно.
lmcadory
4
@Lamar McAdory: Пожалуйста, объясните, что именно вы имеете в виду под словом «одновременно», возможно, приведя конкретный пример того, что вы сделали, чего вы ожидали и что произошло на самом деле.
NPE
4
@Lamar: У вас никогда не будет никакой гарантии «точно в одно и то же время», и думать, что вы можете, просто неправильно. В зависимости от того, сколько у вас процессоров, нагрузки на машину, время выполнения многих вещей, происходящих на компьютере, будет влиять на время запуска потоков / процессов. Кроме того, поскольку процессы запускаются сразу после создания, накладные расходы на создание процесса также должны быть рассчитаны с учетом разницы во времени, которую вы видите.
Мартин
1
можно ли получить список результатов каждой функции? допустим, каждая функция возвращает другое значение, можно ли добавить значения в какой-либо список, который можно будет использовать позже? может быть, добавить результат в глобальный список?
Pelos
1
Если мои функции принимают параметры, и когда я передаю параметры при их вызове из отдельных процессов, они не запускаются одновременно. Не могли бы вы помочь
user2910372
18

Это можно элегантно сделать с помощью Ray , системы, которая позволяет легко распараллеливать и распространять ваш код Python.

Чтобы распараллелить ваш пример, вам нужно будет определить свои функции с помощью @ray.remoteдекоратора, а затем вызвать их с помощью .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Если вы передаете один и тот же аргумент обеим функциям, и аргумент имеет большой размер, более эффективный способ сделать это - использовать ray.put(). Это позволяет избежать двукратной сериализации большого аргумента и создания двух его копий в памяти:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Если func1()и func2()вернут результаты, вам нужно переписать код следующим образом:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Использование Ray дает ряд преимуществ перед модулем многопроцессорной обработки . В частности, один и тот же код будет работать как на одной машине, так и на кластере машин. Чтобы узнать больше о преимуществах Ray, см. Этот пост .

Ион Стойка
источник
18

Если ваши функции в основном выполняют работу ввода-вывода (и меньше нагрузки на ЦП) и у вас есть Python 3.2+, вы можете использовать ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Если ваши функции в основном выполняют работу процессора (и меньше операций ввода-вывода) и у вас установлен Python 2.6+, вы можете использовать модуль многопроцессорности :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
Дэвид Фостер
источник
Это хороший ответ. Как определить по результату для задач, связанных с вводом-выводом, с помощью concurrent.futures, какая из них завершена? В основном, вместо функций lamba, если у нас есть обычные функции, как определить результат, сопоставленный с вызываемой функцией?
Tragaknight
Неважно, я нашел способ - вместо этого run_cpu_tasks_in_parallel ([lambda: print ('CPU task 1 running!'), Lambda: print ('CPU task 2 running!'),]) Используйте это - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
Если функция выдает выходы для разных параметров, как их сохранить. В самом деле, что нужно разместить вместо lambda: print('CPU task 1 running!'), lambda: print('CPU task 2 running!'),добавления результатов к переменным task1_outputиtask2_output
Сай Киран
5

Если вы являетесь пользователем Windows и используете python 3, то этот пост поможет вам выполнять параллельное программирование на python. При запуске программирования пула обычной многопроцессорной библиотеки вы получите сообщение об ошибке, касающееся основной функции в вашей программе. Это связано с тем, что в Windows нет функции fork (). Приведенный ниже пост дает решение упомянутой проблемы.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Поскольку я использовал python 3, я немного изменил программу:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

После этой функции приведенный выше код проблемы также немного изменится:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

И я получил результат как:

[1, 8, 27, 64, 125, 216]

Я думаю, что этот пост может быть полезен некоторым пользователям Windows.

Арун Сурадж
источник
4

Невозможно гарантировать, что две функции будут выполняться синхронно друг с другом, что, кажется, именно то, что вы хотите сделать.

Лучшее, что вы можете сделать, - это разделить функцию на несколько шагов, а затем дождаться завершения обоих в критических точках синхронизации, используя Process.joinподобные упоминания в ответе @ aix.

Это лучше, чем time.sleep(10)потому, что вы не можете гарантировать точное время. При явном ожидании вы говорите, что функции должны быть выполнены, выполнив этот шаг, прежде чем переходить к следующему, вместо того, чтобы предполагать, что это будет выполнено в течение 10 мс, что не гарантируется в зависимости от того, что еще происходит на машине.

Дэви8
источник
2

Похоже, у вас есть одна функция, которую нужно вызывать с двумя разными параметрами. Это можно элегантно сделать, используя комбинацию concurrent.futuresи mapс Python 3.2+.

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Теперь, если ваша операция связана с вводом-выводом, вы можете использовать ThreadPoolExecutorкак таковой:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Обратите внимание, как mapздесь используется mapваша функция в списке аргументов.

Теперь, если ваша функция связана с процессором, вы можете использовать ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Если вы не уверены, вы можете просто попробовать оба варианта и посмотреть, какой из них дает лучшие результаты.

Наконец, если вы хотите распечатать свои результаты, вы можете просто сделать это:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
источник