Показать ход выполнения вызова imap_unordered для многопроцессорного пула Python?

96

У меня есть сценарий, который успешно выполняет набор задач многопроцессорного пула с imap_unordered()вызовом:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Однако у меня num_tasksоколо 250 000, поэтому join()основной поток блокируется на 10 секунд или около того, и я хотел бы иметь возможность выводить эхо в командную строку постепенно, чтобы показать, что основной процесс не заблокирован. Что-то вроде:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Есть ли метод для объекта результата или самого пула, который указывает количество оставшихся задач? Я попытался использовать multiprocessing.Valueобъект в качестве счетчика ( do_workвызывает counter.value += 1действие после выполнения своей задачи), но счетчик достигает только ~ 85% от общего значения перед остановкой увеличения.

ПолночьМолния
источник

Ответы:

80

Нет необходимости обращаться к закрытым атрибутам результирующего набора:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
jfs
источник
7
Я вижу распечатку только после выхода кода (не на каждой итерации). У вас есть предложение?
Ханан Штейнгарт
@HananShteingart: Он отлично работает в моей системе (Ubuntu) как с Python 2, так и с Python 3. Я использовал def do_word(*a): time.sleep(.1)в качестве примера. Если это не сработает для вас, создайте полный пример минимального кода, который демонстрирует вашу проблему: опишите словами, что вы ожидаете, и что произойдет вместо этого, укажите, как вы запускаете свой скрипт Python, какая у вас ОС, версия Python и опубликуйте как новый вопрос .
jfs
14
У меня была та же проблема, что и у @HananShteingart: это потому, что я пытался использовать Pool.map(). Я не осознавал этого только imap() и imap_unordered()работал таким образом - в документации просто говорится: «Более ленивая версия map ()», но на самом деле это означает, что «базовый итератор возвращает результаты по мере их поступления».
simonmacmullen
@simonmacmullen: и вопрос, и мой ответ используют imap_unordered(). Проблема Ханана, вероятно, связана с sys.stderr.write('\r..')(перезаписью той же строки, чтобы показать прогресс).
jfs
2
Также возможно! В основном я хотел задокументировать сделанное мной глупое предположение - на случай, если кто-нибудь, читающий это, тоже сделает это.
simonmacmullen
94

Мой личный фаворит - дает вам симпатичную маленькую шкалу прогресса и время завершения, пока все выполняются и фиксируются параллельно.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Тим
источник
64
что, если пул возвращает значение?
Nickpick 06
11
Я создал пустой список с именем result перед циклом, а затем внутри цикла просто выполните result.append (x). Я пробовал это с двумя процессами и использовал imap вместо карты, и все работало так, как я хотел, на @nickpick
bs7280
2
так что мой индикатор выполнения повторяется на новые строки, а не на месте, есть идеи, почему это может быть?
Остин
2
не забудьтеpip install tqdm
мистер Т.
3
@ bs7280 Под result.append (x) вы имели в виду result.append (_)? Что такое х?
Джейсон
27

Я обнаружил, что работа уже была сделана, когда я попытался проверить ее продвижение. Это то, что у меня сработало при использовании tqdm .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Это должно работать со всеми разновидностями многопроцессорной обработки, независимо от того, блокируют они или нет.

Reubano
источник
4
Думаю, создается кучка потоков, и каждый поток считается независимо
nburn42
1
У меня есть функции внутри функций, которые приводят к ошибке травления.
ojunk
21

Нашел ответ сам с некоторыми более рытье: Принимая взгляд на __dict__часть imap_unorderedобъекта результата, я нашел , что это имеет _indexатрибут , который увеличивается с каждым завершения задачи. Итак, это работает для ведения журнала, заключенного в whileцикл:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Однако я обнаружил, что замена imap_unordereda на a приводит map_asyncк гораздо более быстрому выполнению, хотя объект результата немного отличается. Вместо этого объект результата из map_asyncимеет _number_leftатрибут и ready()метод:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
ПолночьМолния
источник
3
Я тестировал это для Python 2.7.6, и rs._number_left, похоже, является количеством оставшихся фрагментов. Так что, если rs._chunksize не 1, тогда rs._number_left не будет количеством оставшихся элементов списка.
Аллен
Куда мне поместить этот код? Я имею в виду, что это не выполняется до тех пор, пока содержание не rsбудет известно, и это немного поздно или нет?
Вакан Танка
@WakanTanka: он входит в основной скрипт после того, как отделяет дополнительные потоки. В моем исходном примере он входит в цикл «while», где rsуже были запущены другие потоки.
MidnightLightning
1
Не могли бы вы отредактировать свой вопрос и / или ответ, чтобы показать минимальный рабочий пример. Я не вижу rsни одного цикла, я новичок в многопроцессорной обработке, и это поможет. Большое спасибо.
Вакан Танка
1
По крайней мере python 3.5, с помощью решения _number_leftне работает. _number_leftпредставляет фрагменты, которые еще предстоит обработать. Например, если я хочу, чтобы 50 элементов передавались моей функции параллельно, то для пула потоков с 3 процессами _map_async()создается 10 фрагментов по 5 элементов в каждом. _number_leftзатем представляет, сколько из этих фрагментов было выполнено.
mSSM
9

Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать прогресс пула задач в python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

По сути, вы используете apply_async с callbak (в этом случае он должен добавить возвращаемое значение в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в рамках цикла while, вы проверяете ход работы. В данном случае я добавил виджет, чтобы он выглядел лучше.

Выход:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Надеюсь, поможет.

Жюльен Туриль
источник
нужно изменить: [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]для(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Дэвид Пшибилла
Это не правда. Объект-генератор здесь работать не будет. Проверено.
swagatam
9

Как предложил Тим, вы можете использовать tqdmи imapдля решения этой проблемы. Я только что наткнулся на эту проблему и настроил imap_unorderedрешение, чтобы получить доступ к результатам сопоставления. Вот как это работает:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

Если вам не нужны значения, возвращаемые вашими заданиями, вам не нужно назначать список какой-либо переменной.

Mrapacz
источник
4

для всех, кто ищет простое решение, работающее с Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
Zeawoas
источник
3

Я создал собственный класс для создания распечатки прогресса. Маби это помогает:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results
Аронстеф
источник
1

Попробуйте этот простой подход на основе очередей, который также можно использовать с пулом. Помните, что печать чего-либо после запуска индикатора выполнения приведет к его перемещению, по крайней мере, для этого конкретного индикатора выполнения. (Прогресс PyPI 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()
Мотт Кортеж
источник