Многопроцессорность: используйте tqdm для отображения индикатора выполнения

103

Чтобы сделать мой код более «питоническим» и более быстрым, я использую «многопроцессорность» и функцию карты, чтобы отправить ему а) функцию и б) диапазон итераций.

Имплантированное решение (то есть вызов tqdm непосредственно в диапазоне tqdm.tqdm (диапазон (0, 30)) не работает с многопроцессорной обработкой (как сформулировано в приведенном ниже коде).

Индикатор выполнения отображается от 0 до 100% (когда python читает код?), Но он не указывает на фактический прогресс функции карты.

Как отобразить индикатор выполнения, показывающий, на каком этапе выполняется функция «карта»?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Любая помощь или предложения приветствуются ...

SciPy
источник
Можете ли вы опубликовать фрагмент кода индикатора выполнения?
Alex
2
Для людей, которые ищут решение с .starmap(): Вот патч для Poolдобавления .istarmap(), который также будет работать с tqdm.
Darkonaut

Ответы:

136

Используйте imap вместо map, который возвращает итератор обработанных значений.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
hkyi
источник
14
Вложенный оператор list () ожидает завершения итератора. total = также требуется, поскольку tqdm не знает, как долго будет
длиться
16
Есть ли подобное решение для starmap()?
tarashypka
2
for i in tqdm.tqdm(...): pass может быть более прямолинейным, этоlist(tqdm.tqdm)
savfod
1
Это работает, но у кого-нибудь еще он постоянно печатал индикатор выполнения на новой строке для каждой итерации?
Деннис Субачёв
3
Поведение привязано, когда оно специфично chunk_sizeдля p.imap. Можно ли tqdmобновлять каждую итерацию вместо каждого фрагмента?
huangbiubiu
56

Найдено решение: будьте осторожны! Из-за многопроцессорной обработки время оценки (итерация за цикл, общее время и т. Д.) Может быть нестабильным, но индикатор выполнения работает отлично.

Примечание. Диспетчер контекста для пула доступен только в версии Python 3.3.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()
SciPy
источник
2
pbar.close()не требуется, он будет автоматически закрыт по окончанииwith
Сагар Кар
5
tqdmЗдесь нужен второй / внутренний вызов?
shadowtalker 07
7
как насчет вывода _foo (my_number), который возвращается как "r" в вопросе?
Ликак
4
Есть ли подобное решение для starmap()?
tarashypka
2
@shadowtalker - вроде без работы работает;). В любом случае - imap_unorderedэто ключевой момент, он дает лучшую производительность и лучшие оценки индикатора выполнения.
Tomasz
24

Извините за опоздание, но если все, что вам нужно, это параллельная карта, я добавил эту функцию в tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Ссылки: https://tqdm.github.io/docs/contrib.concurrent/ и https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

casper.dcl
источник
1
Спасибо за это. Работает легко, намного лучше, чем любое другое решение, которое я пробовал.
user3340499
Круто (+1), но HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))вбрасывает Жупитера
Эбе Исаак
@ ВБР-Isaac см github.com/tqdm/tqdm/issues/937
casper.dcl
Я вижу проблему с обсуждением взлома tqdm_notebook, однако не могу найти решение для tqdm.contrib.concurrent.
Эбе Исаак,
Это потрясающе. Просто работает прямо из коробки.
Ларс Ларссон
21

p_tqdmВместо этого можно использовать .

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Виктор Квач
источник
1
Это работает очень хорошо, и это было очень легко pip install. Это замена tqdm для большинства моих нужд
Crypdick
Merci Victor;)
Габриэль Ромон
p_tqdmограничено multiprocessing.Pool, недоступно для потоков
Pateheo
8

на основе ответа Хави Мартинеса я написал функцию imap_unordered_bar. Его можно использовать так же, как imap_unorderedс той лишь разницей, что отображается полоса обработки.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Оливер Уилкен
источник
3
Это будет перерисовывать полосу на каждом шаге на новой строке. Как обновить эту же строчку?
misantroop
Решение в моем случае (Windows / Powershell): Colorama.
misantroop
«pbar.close () не требуется, она будет закрыта автоматически по окончании с» как комментарий Сагар сделал на @ SciPy Ответим
Tejas Шетти
1

Вот мой вариант, когда вам нужно получить результаты от ваших функций параллельного выполнения. Эта функция делает несколько вещей (есть еще один мой пост, который объясняет это дополнительно), но ключевым моментом является то, что есть очередь ожидающих задач и очередь завершенных задач. По мере того, как рабочие завершают выполнение каждой задачи в очереди ожидания, они добавляют результаты в очередь завершенных задач. Вы можете перенести проверку в очередь выполненных задач с помощью индикатора выполнения tqdm. Я не помещаю здесь реализацию функции do_work (), это не актуально, так как сообщение здесь предназначено для отслеживания очереди выполненных задач и обновления индикатора выполнения каждый раз, когда появляется результат.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Ник Б.
источник
0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
темный мужчина
источник
-2

Это простой подход, и он работает.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Виджаябхаскар Дж.
источник