многопоточность python дождитесь завершения всех потоков

119

Возможно, об этом спрашивали в аналогичном контексте, но я не смог найти ответ примерно через 20 минут поиска, поэтому спрошу.

Я написал сценарий Python (скажем: scriptA.py) и сценарий (скажем, scriptB.py)

В scriptB я хочу вызвать scriptA несколько раз с разными аргументами, каждый раз для запуска требуется около часа (это огромный сценарий, делает много всего ... не беспокойтесь об этом), и я хочу иметь возможность запускать scriptA со всеми различными аргументами одновременно, но мне нужно подождать, пока ВСЕ они не будут выполнены, прежде чем продолжить; мой код:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Я хочу запустить все subprocess.call()одновременно, а затем подождать, пока все они будут выполнены, как мне это сделать?

Я пытался использовать потоки, как в примере здесь :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Но я не думаю, что это правильно.

Как я узнаю, что все они закончили бег, прежде чем идти ко мне do_finish()?

Инбар Роуз
источник

Ответы:

150

Вы должны использовать присоединиться метод Threadобъекта в конце сценария.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Таким образом, основной поток будет ждать , пока t1, t2и t3закончить выполнение.

Максим Скуридзин
источник
5
хммм - возникли проблемы с пониманием чего-то, не будет ли сначала запускаться t1, ждать его завершения, затем переходить к t2..etc и т. д.? как сделать все это сразу? Я не понимаю, как это будет запускать их одновременно?
Inbar Rose
25
Вызов joinблокируется до тех пор, пока поток не завершит выполнение. В любом случае вам придется ждать всех потоков. Если t1сначала завершится, вы начнете ждать t2(которое, возможно, уже закончилось, и вы немедленно перейдете к ожиданию t3). Если выполнение t1заняло больше всего времени, когда вы вернетесь из него, оба t1и t2сразу же вернутся без блокировки.
Максим Скуридзин
1
Вы не понимаете мой вопрос - если я скопирую приведенный выше код в свой код - он будет работать? или я что-то упускаю?
Инбар Роуз
2
хорошо, я вижу. теперь я понимаю, был немного сбит с толку, но я думаю, что понимаю, joinвроде прикрепляет текущий процесс к потоку и ждет, пока он не будет выполнен, и если t2 завершится до t1, тогда, когда t1 будет выполнено, он проверит, выполняется ли t2, см. что это так, а затем проверьте t3..etc..etc .. и только тогда, когда все будет сделано, оно продолжится. здорово.
Инбар Роуз
3
скажем, t1 занимает больше всего времени, но t2 имеет исключение. что тогда происходит? Можете ли вы поймать это исключение или проверить, нормально ли завершился t2?
Ciprian Tomoiagă
174

Поместите потоки в список, а затем используйте метод Join

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Аарон Дигулла
источник
1
Да, это сработает, но это труднее понять. Вы всегда должны пытаться найти баланс между компактным кодом и «удобочитаемостью». Помните: код пишется один раз, но читается много раз. Поэтому более важно, чтобы это было легко понять.
Аарон Дигулла
2
«Заводской шаблон» я не могу объяснить одним предложением. Найдите его в Google и выполните поиск на stackoverflow.com. Есть много примеров и объяснений. В двух словах: вы пишете код, который создает для вас что-то сложное. Как настоящая фабрика: вы отдаете заказ и получаете обратно готовый продукт.
Аарон Дигулла
18
Мне не нравится идея использовать понимание списка из-за его побочных эффектов и не делать ничего полезного с полученным списком. Простой цикл for будет чище, даже если он распределит две строки ...
Иоан Александру Куку
1
@Aaron DIgull Я понимаю это. Я имею в виду, что я бы просто сделал, for x in threads: x.join()а не использовал понимание списка
Иоан Александру Куку
1
@IoanAlexandruCucu: Мне все еще интересно, есть ли более удобочитаемое и эффективное решение: stackoverflow.com/questions/21428602/…
Аарон Дигулла
29

В Python3, начиная с Python 3.2, есть новый подход для достижения того же результата, который я лично предпочитаю традиционному созданию потока / запуску / присоединению, package concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Использование ThreadPoolExecutorкода будет:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

Результат предыдущего кода выглядит примерно так:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Одним из преимуществ является то, что вы можете контролировать пропускную способность, задав максимальное количество одновременных рабочих.

Роберто
источник
но как узнать, когда все потоки в пуле потоков завершились?
Prime By Design
1
Как вы можете видеть в примере, код после withоператора выполняется, когда все задачи завершены.
Роберто
это не работает. Попробуйте сделать что-нибудь действительно длинное в потоках. Ваш оператор печати будет выполнен до завершения потока
Пранали
@Pranalee, этот код работает, я обновил код, чтобы добавить строки вывода. Вы не можете увидеть «Все задачи ...» до того, как все потоки будут завершены. В withэтом случае оператор работает так, как задумано. В любом случае, вы всегда можете открыть новый вопрос в SO и опубликовать свой код, чтобы мы могли помочь вам узнать, что происходит в вашем случае.
Роберто
@PrimeByDesign, вы можете использовать concurrent.futures.waitфункцию, вы можете увидеть здесь реальный пример Официальные документы: docs.python.org/3/library/…
Александр Фортин
28

Я предпочитаю использовать понимание списка на основе списка ввода:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Адам Матан
источник
Проверенный ответ хорошо объясняет, но этот короче и не требует уродливых повторений. Просто хороший ответ. :)
tleb
Понимание списков только для побочных эффектов обычно обесценивается *. Но в данном случае это кажется хорошей идеей. * stackoverflow.com/questions/5753597/…
Винаяк Канияраккал
3
@VinayakKaniyarakkal, разве for t in threads:t.start()не лучше?
SmartManoj
5

У вас может быть класс, похожий на приведенный ниже, из которого вы можете добавить n функций или console_scripts, которые вы хотите выполнять параллельно, и начать выполнение и дождаться завершения всех заданий.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
источник
Это многопроцессорность. Вопрос касался docs.python.org/3/library/threading.html
Рустам А.
3

Из threading документации модуля

Есть объект «основной поток»; это соответствует начальному потоку управления в программе Python. Это не поток демона.

Есть вероятность того, что будут созданы «объекты фиктивного потока». Это объекты потоков, соответствующие «чужеродным потокам», которые представляют собой потоки управления, запускаемые вне модуля потоковой передачи, например непосредственно из кода C. Объекты фиктивного потока имеют ограниченную функциональность; они всегда считаются живыми и демоническими, и их нельзя join()редактировать. Они никогда не удаляются, так как невозможно обнаружить завершение чужих потоков.

Итак, чтобы поймать эти два случая, когда вы не заинтересованы в ведении списка создаваемых вами потоков:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Засим:

>>> print(data)
[0, 4, 12, 40]
berna1111
источник
2

Может быть, что-то вроде

for t in threading.enumerate():
    if t.daemon:
        t.join()
Jno
источник
Я пробовал этот код, но не уверен в его работе, потому что была напечатана последняя инструкция моего кода, которая была после этого цикла for, но процесс не был завершен.
Omkar
1

Я только что столкнулся с той же проблемой, когда мне нужно было дождаться всех потоков, которые были созданы с помощью цикла for. Я просто попробовал следующий фрагмент кода. Возможно, это не идеальное решение, но я подумал, что это будет простое решение. тестировать:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
источник