Как распараллелить простой цикл Python?

256

Вероятно, это тривиальный вопрос, но как мне распараллелить следующий цикл в python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Я знаю, как запускать отдельные потоки в Python, но я не знаю, как «собрать» результаты.

Многократные процессы были бы также хороши - что бы ни было проще для этого случая. В настоящее время я использую Linux, но код также должен работать на Windows и Mac.

Какой самый простой способ распараллелить этот код?

Мне самому
источник

Ответы:

192

Использование нескольких потоков в CPython не даст вам лучшей производительности для чистого Python-кода из-за глобальной блокировки интерпретатора (GIL). Я предлагаю multiprocessingвместо этого использовать модуль:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Обратите внимание, что это не будет работать в интерактивном переводчике.

Чтобы избежать обычного FUD вокруг GIL: в любом случае не было бы никакого преимущества в использовании потоков для этого примера. Здесь вы хотите использовать процессы, а не потоки, потому что они позволяют избежать целого ряда проблем.

Свен Марнах
источник
47
Поскольку это выбранный ответ, возможно ли привести более полный пример? Каковы аргументы calc_stuff?
Эдуардо Пиньятелли
2
@EduardoPignatelli Пожалуйста, прочитайте документацию multiprocessingмодуля для более полных примеров. Pool.map()в принципе работает как map(), но параллельно.
Свен Марнах
3
Есть ли способ просто добавить в строке загрузки tqdm к этой структуре кода? Я использовал tqdm (pool.imap (calc_stuff, range (0, 10 * offset, offset))), но я не получаю полную диаграмму загрузки.
user8188120
@ user8188120 Я никогда раньше не слышал о tqdm, так что извините, я не могу помочь с этим.
Свен Марнах
Для загрузки бар tqdm см. Этот вопрос: stackoverflow.com/questions/41920124/…
Йоханнес
67

Чтобы распараллелить простой цикл for, joblib придает большое значение необработанному использованию многопроцессорной обработки. Не только короткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов) или захват трассировки дочернего процесса, для лучшей отчетности об ошибках.

Отказ от ответственности: я оригинальный автор joblib.

Gael Varoquaux
источник
1
Я попробовал JobLib с Jupyter, он не работает. После параллельного вызова страница перестала работать.
Цзе
1
Привет, у меня проблема с использованием joblib ( stackoverflow.com/questions/52166572/… ), у вас есть какие-либо подсказки, что может быть причиной? Огромное спасибо.
Ting Sun
Похоже, что-то, что я хочу дать выстрел! Можно ли использовать его с двойной петлей, например, для i в диапазоне (10): для j в диапазоне (20)
CutePoison
51

Какой самый простой способ распараллелить этот код?

Мне это очень нравится concurrent.futures, доступно в Python3 начиная с версии 3.2 - и через backport до 2.6 и 2.7 на PyPi .

Вы можете использовать потоки или процессы и использовать точно такой же интерфейс.

многопроцессорная обработка

Поместите это в файл - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

И вот вывод:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Многопоточность

Теперь перейдите ProcessPoolExecutorв ThreadPoolExecutorи снова запустите модуль:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Теперь вы сделали как многопоточность, так и многопроцессорность!

Обратите внимание на производительность и использование обоих вместе.

Выборка слишком мала, чтобы сравнить результаты.

Однако я подозреваю, что многопоточность будет быстрее, чем многопроцессорность в целом, особенно в Windows, поскольку Windows не поддерживает разветвление, поэтому каждому новому процессу требуется время для запуска. На Linux или Mac они, вероятно, будут ближе.

Вы можете вкладывать несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.

Аарон Холл
источник
ThreadPoolExecutor обходит ограничения, налагаемые GIL? также вам не нужно присоединяться к (), чтобы дождаться завершения исполнителей, или это неявно позаботилось в менеджере контекста
PirateApp
1
Нет и нет, да, чтобы "обрабатываться неявно"
Аарон Холл
По какой-то причине при масштабировании проблемы многопоточность является чрезвычайно быстрой, но многопроцессорность порождает множество застрявших процессов (в macOS). Есть идеи, почему это может быть? Процесс содержит только вложенные циклы и математику, ничего экзотического.
komodovaran_
@komodovaran_ Процесс - это полноценный процесс Python, по одному на каждый, в то время как поток - это просто поток выполнения со своим собственным стеком, который разделяет процесс, его байт-код и все остальное, что у него есть в памяти, со всеми другими потоками - помогает ли это ?
Аарон Холл
49
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Вышеописанное прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).

Взято с https://blog.dominodatalab.com/simple-parallelization/

TYREX
источник
3
Я попробовал ваш код, но в моей системе последовательная версия этого кода занимает около получаса, а вышеуказанная параллельная версия занимает 4 минуты. Почему так?
Шайфали Гупта
3
Спасибо за Ваш ответ! Я думаю, что это самый элегантный способ сделать это в 2019 году.
Хейкки Пулккинен,
2
многопроцессорная обработка недопустима для Python 3.x, поэтому для меня это не работает.
EngrStudent
2
@EngrStudent Не уверен, что вы подразумеваете под "недействительным". Это работает для Python 3.6.x для меня.
Tyrex
@tyrex спасибо, что поделились! этот пакет JobLib отличный, и пример работает для меня. Хотя в более сложном контексте, к сожалению, у меня была ошибка. github.com/joblib/joblib/issues/949
Открытый продовольственный брокер
13

Использование Ray имеет ряд преимуществ :

  • Вы можете распараллеливать на нескольких машинах в дополнение к нескольким ядрам (с одним и тем же кодом).
  • Эффективная обработка числовых данных через разделяемую память (и сериализация без копирования).
  • Высокая пропускная способность при распределенном планировании.
  • Отказоустойчивость.

В вашем случае вы можете запустить Ray и определить удаленную функцию

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

а затем вызвать его параллельно

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Чтобы запустить тот же пример в кластере, единственной строкой, которая могла бы измениться, был бы вызов ray.init (). Соответствующую документацию можно найти здесь .

Обратите внимание, что я помогаю развивать Рэя.

Роберт Нишихара
источник
1
Для любого, кто рассматривает Ray, может быть уместно знать, что он изначально не поддерживает Windows. Некоторые хаки для того, чтобы заставить его работать в Windows с использованием WSL (Windows Subsystem for Linux), возможны, хотя вряд ли это возможно, если вы хотите использовать Windows.
OscarVanL
9

Это самый простой способ сделать это!

Вы можете использовать Asyncio . (Документация может быть найдена здесь ). Он используется в качестве основы для нескольких асинхронных сред Python, которые предоставляют высокопроизводительные сетевые и веб-серверы, библиотеки подключений к базам данных, распределенные очереди задач и т. Д. Кроме того, он имеет как высокоуровневые, так и низкоуровневые API для решения любых задач. ,

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Теперь эта функция будет запускаться параллельно при каждом вызове без перевода основной программы в состояние ожидания. Вы также можете использовать его для распараллеливания цикла. Когда вызывается для цикла for, хотя цикл является последовательным, но каждая итерация выполняется параллельно основной программе, как только туда попадает интерпретатор. Например:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

Это дает следующий результат:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
USER5
источник
Я думаю, что есть опечатка, wrapped()и это должно быть **kwargsвместо*kwargs
jakub-olczyk
К сожалению! Моя ошибка. Исправлено!
User5
6

почему вы не используете потоки и один мьютекс для защиты одного глобального списка?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

имейте в виду, вы будете так быстро, как ваша самая медленная нить

jackdoe
источник
2
Я знаю, что это очень старый ответ, так что обидно получить случайное отрицательное голосование из ниоткуда. Я только понизил голосование, потому что потоки ничего не распараллеливают Потоки в Python связаны только с одним потоком, выполняющимся в интерпретаторе одновременно, из-за глобальной блокировки интерпретатора, поэтому они поддерживают параллельное программирование, но не параллельное, как запрашивает OP.
skrrgwasme
3
@skrrgwasme Я знаю, что вы это знаете, но когда вы используете слова «они ничего не будут распараллеливать», это может ввести читателей в заблуждение. Если операции занимают много времени из-за того, что они связаны с вводом-выводом или находятся в спящем режиме, ожидая события, интерпретатор освобождается для запуска других потоков, поэтому это приведет к увеличению скорости, на которое люди надеются в этих случаях. Только то, что связано с процессором, действительно зависит от того, что говорит skrrgwasme.
Джонатан Хартли,
5

Я нашел joblibочень полезным со мной. Пожалуйста, смотрите следующий пример:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1: использовать все доступные ядра

miuxu
источник
14
Вы знаете, лучше проверить уже существующие ответы, прежде чем публиковать свои. Этот ответ также предлагает использовать joblib.
Саняш
2

Допустим, у нас есть асинхронная функция

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Это должно быть запущено на большом массиве. Некоторые атрибуты передаются в программу, а некоторые используются из свойства элемента словаря в массиве.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))
Амит Тели
источник
1

Посмотри на это;

http://docs.python.org/library/queue.html

Это может быть неправильный способ сделать это, но я бы сделал что-то вроде;

Актуальный код;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Надеюсь, это поможет.

Меррем
источник
1

Это может быть полезно при реализации многопроцессорных и параллельных / распределенных вычислений в Python.

Учебник YouTube по использованию пакета techila

Techila - это промежуточное программное обеспечение для распределенных вычислений, которое напрямую интегрируется с Python с помощью пакета techila. Функция персика в пакете может быть полезна для распараллеливания структур цикла. (Следующий фрагмент кода с форумов сообщества Techila )

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
тройник
источник
1
Хотя эта ссылка может ответить на вопрос, лучше включить сюда основные части ответа и предоставить ссылку для справки. Ответы, содержащие только ссылки, могут стать недействительными, если связанная страница изменится.
SL Barth - Восстановить Монику
2
@SLBarth спасибо за отзыв. Я добавил небольшой пример кода в ответ.
Ти
1

спасибо @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'
Фелипе де Македо
источник
2
-1. Это только кодовый ответ. Я бы предложил добавить объяснение, которое сообщает читателям, что делает код, который вы опубликовали, и, возможно, где они могут найти дополнительную информацию.
starbeamrainbowlabs
-1

очень простой пример параллельной обработки

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()
Адил Варси
источник
3
Здесь нет параллелизма в цикле for, вы просто порождаете процесс, который запускает весь цикл; это НЕ то, что задумал ОП.
facuq