Для C ++ мы можем использовать OpenMP для параллельного программирования; однако OpenMP не будет работать с Python. Что мне делать, если я хочу распараллелить некоторые части моей программы на Python?
Структуру кода можно рассматривать как:
solve1(A)
solve2(B)
Где solve1
и solve2
две независимые функции. Как запустить такой код параллельно, а не последовательно, чтобы сократить время выполнения? Код такой:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
Где setinner
и setouter
две независимые функции. Вот где я хочу провести параллель ...
python
parallel-processing
ilovecp3
источник
источник
Ответы:
Вы можете использовать модуль многопроцессорности . В этом случае я мог бы использовать пул обработки:
from multiprocessing import Pool pool = Pool() result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously answer1 = result1.get(timeout=10) answer2 = result2.get(timeout=10)
Это вызовет процессы, которые могут выполнять за вас общую работу. Поскольку мы не прошли
processes
, он будет порождать по одному процессу для каждого ядра ЦП на вашем компьютере. Каждое ядро ЦП может одновременно выполнять один процесс.Если вы хотите сопоставить список с одной функцией, сделайте следующее:
Не используйте потоки, потому что GIL блокирует любые операции с объектами python.
источник
pool.map
также принимают словари как аргументы? Или только простые списки?Это можно сделать очень элегантно с помощью Ray .
Чтобы распараллелить ваш пример, вам нужно будет определить свои функции с помощью
@ray.remote
декоратора, а затем вызвать их с помощью.remote
.import ray ray.init() # Define the functions. @ray.remote def solve1(a): return 1 @ray.remote def solve2(b): return 2 # Start two tasks in the background. x_id = solve1.remote(0) y_id = solve2.remote(1) # Block until the tasks are done and get the results. x, y = ray.get([x_id, y_id])
У этого есть ряд преимуществ по сравнению с многопроцессорным модулем.
Эти вызовы функций могут быть составлены вместе, например,
@ray.remote def f(x): return x + 1 x_id = f.remote(1) y_id = f.remote(x_id) z_id = f.remote(y_id) ray.get(z_id) # returns 4
Обратите внимание, что Рэй - это фреймворк, который я помогал разрабатывать.
источник
pip
. Предлагаю попробоватьpip install --upgrade pip
. Если вамsudo
вообще нужно использовать, то возможно, что версияpip
, которую вы используете для установкиray
, не та, которая обновляется. Вы можете проверить сpip --version
. Кроме того, Windows в настоящее время не поддерживается, поэтому, вероятно, проблема заключается в Windows.CPython использует глобальную блокировку интерпретатора, что делает параллельное программирование немного более интересным, чем C ++.
В этом разделе есть несколько полезных примеров и описаний задач:
Обходной путь Python Global Interpreter Lock (GIL) в многоядерных системах с использованием набора задач в Linux?
источник
Решение, как говорили другие, состоит в использовании нескольких процессов. Однако какая структура более подходящая, зависит от многих факторов. Помимо уже упомянутых, есть еще charm4py и mpi4py (я разработчик charm4py).
Существует более эффективный способ реализовать приведенный выше пример, чем использование абстракции рабочего пула. Основной цикл
G
снова и снова отправляет одни и те же параметры (включая полный график ) рабочим на каждой из 1000 итераций. Поскольку по крайней мере один рабочий будет находиться в другом процессе, это включает в себя копирование и отправку аргументов другому процессу (ам). Это может быть очень дорогостоящим в зависимости от размера объектов. Вместо этого имеет смысл, чтобы работники хранили состояние и просто отправляли обновленную информацию.Например, в charm4py это можно сделать так:
class Worker(Chare): def __init__(self, Q, G, n): self.G = G ... def setinner(self, node1, node2): self.updateGraph(node1, node2) ... def solve(Q, G, n): # create 2 workers, each on a different process, passing the initial state worker_a = Chare(Worker, onPE=0, args=[Q, G, n]) worker_b = Chare(Worker, onPE=1, args=[Q, G, n]) while i < 1000: result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B inneropt, partition, x = result_a.get() # wait for result from worker A outeropt = result_b.get() # wait for result from worker B ...
Обратите внимание, что для этого примера нам действительно нужен только один рабочий. Основной цикл может выполнять одну из функций, а рабочий цикл - другую. Но мой код помогает проиллюстрировать пару вещей:
result_a.get()
он заблокирован в ожидании результата, рабочий A выполняет вычисления в том же процессе.источник
В некоторых случаях можно автоматически распараллеливать циклы с помощью Numba , хотя это работает только с небольшим подмножеством Python:
from numba import njit, prange @njit(parallel=True) def prange_test(A): s = 0 # Without "parallel=True" in the jit-decorator # the prange statement is equivalent to range for i in prange(A.shape[0]): s += A[i] return s
К сожалению, кажется, что Numba работает только с массивами Numpy, но не с другими объектами Python. Теоретически также возможно скомпилировать Python в C ++, а затем автоматически распараллелить его с помощью компилятора Intel C ++ , хотя я еще не пробовал этого.
источник
Вы можете использовать
joblib
библиотеку для параллельных вычислений и многопроцессорной обработки.from joblib import Parallel, delayed
Вы можете просто создать функцию,
foo
которую хотите запускать параллельно и на основе следующего фрагмента кода реализовать параллельную обработку:output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
Где
num_cores
можно получить изmultiprocessing
библиотеки:import multiprocessing num_cores = multiprocessing.cpu_count()
Если у вас есть функция с более чем одним входным аргументом, и вы просто хотите перебрать один из аргументов по списку, вы можете использовать
partial
функцию изfunctools
библиотеки следующим образом:from joblib import Parallel, delayed import multiprocessing from functools import partial def foo(arg1, arg2, arg3, arg4): ''' body of the function ''' return output input = [11,32,44,55,23,0,100,...] # arbitrary list num_cores = multiprocessing.cpu_count() foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4) # arg1 is being fetched from input list output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
Вы можете найти полное объяснение многопроцессорности python и R с парой примеров здесь .
источник