Чтобы сделать мой код более «питоническим» и более быстрым, я использую «многопроцессорность» и функцию карты, чтобы отправить ему а) функцию и б) диапазон итераций.
Имплантированное решение (то есть вызов tqdm непосредственно в диапазоне tqdm.tqdm (диапазон (0, 30)) не работает с многопроцессорной обработкой (как сформулировано в приведенном ниже коде).
Индикатор выполнения отображается от 0 до 100% (когда python читает код?), Но он не указывает на фактический прогресс функции карты.
Как отобразить индикатор выполнения, показывающий, на каком этапе выполняется функция «карта»?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Любая помощь или предложения приветствуются ...
.starmap()
: Вот патч дляPool
добавления.istarmap()
, который также будет работать сtqdm
.Ответы:
Используйте imap вместо map, который возвращает итератор обработанных значений.
from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(2) as p: r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
источник
starmap()
?for i in tqdm.tqdm(...): pass
может быть более прямолинейным, этоlist(tqdm.tqdm)
chunk_size
дляp.imap
. Можно лиtqdm
обновлять каждую итерацию вместо каждого фрагмента?Найдено решение: будьте осторожны! Из-за многопроцессорной обработки время оценки (итерация за цикл, общее время и т. Д.) Может быть нестабильным, но индикатор выполнения работает отлично.
Примечание. Диспетчер контекста для пула доступен только в версии Python 3.3.
from multiprocessing import Pool import time from tqdm import * def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(processes=2) as p: max_ = 30 with tqdm(total=max_) as pbar: for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))): pbar.update()
источник
pbar.close()
не требуется, он будет автоматически закрыт по окончанииwith
tqdm
Здесь нужен второй / внутренний вызов?starmap()
?imap_unordered
это ключевой момент, он дает лучшую производительность и лучшие оценки индикатора выполнения.Извините за опоздание, но если все, что вам нужно, это параллельная карта, я добавил эту функцию в
tqdm>=4.42.0
:from tqdm.contrib.concurrent import process_map # or thread_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = process_map(_foo, range(0, 30), max_workers=2)
Ссылки: https://tqdm.github.io/docs/contrib.concurrent/ и https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
источник
HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))
вбрасывает Жупитераp_tqdm
Вместо этого можно использовать .https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = p_map(_foo, list(range(0, 30)))
источник
pip install
. Это замена tqdm для большинства моих нуждp_tqdm
ограниченоmultiprocessing.Pool
, недоступно для потоковна основе ответа Хави Мартинеса я написал функцию
imap_unordered_bar
. Его можно использовать так же, какimap_unordered
с той лишь разницей, что отображается полоса обработки.from multiprocessing import Pool import time from tqdm import * def imap_unordered_bar(func, args, n_processes = 2): p = Pool(n_processes) res_list = [] with tqdm(total = len(args)) as pbar: for i, res in tqdm(enumerate(p.imap_unordered(func, args))): pbar.update() res_list.append(res) pbar.close() p.close() p.join() return res_list def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': result = imap_unordered_bar(_foo, range(5))
источник
Вот мой вариант, когда вам нужно получить результаты от ваших функций параллельного выполнения. Эта функция делает несколько вещей (есть еще один мой пост, который объясняет это дополнительно), но ключевым моментом является то, что есть очередь ожидающих задач и очередь завершенных задач. По мере того, как рабочие завершают выполнение каждой задачи в очереди ожидания, они добавляют результаты в очередь завершенных задач. Вы можете перенести проверку в очередь выполненных задач с помощью индикатора выполнения tqdm. Я не помещаю здесь реализацию функции do_work (), это не актуально, так как сообщение здесь предназначено для отслеживания очереди выполненных задач и обновления индикатора выполнения каждый раз, когда появляется результат.
def par_proc(job_list, num_cpus=None, verbose=False): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Set the number of workers here num_workers = min(num_cpus, num_tasks) # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 with tqdm(total=num_tasks) as bar: while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 bar.update(completed_tasks_counter) for p in processes: p.join() return results
источник
import multiprocessing as mp import tqdm some_iterable = ... def some_func(): # your logic ... if __name__ == '__main__': with mp.Pool(mp.cpu_count()-2) as p: list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
источник
Это простой подход, и он работает.
from multiprocessing.pool import ThreadPool import time from tqdm import tqdm def job(): time.sleep(1) pbar.update() pool = ThreadPool(5) with tqdm(total=100) as pbar: for i in range(100): pool.apply_async(job) pool.close() pool.join()
источник