Мне очень сложно понять, как многопроцессорная очередь работает на Python и как ее реализовать. Допустим, у меня есть два модуля Python, которые обращаются к данным из общего файла, назовем эти два модуля писателем и читателем. Мой план состоит в том, чтобы и читатель, и писатель помещали запросы в две отдельные очереди многопроцессорной обработки, а затем третий процесс выдавал эти запросы в цикле и выполнял их как таковые.
Моя основная проблема заключается в том, что я действительно не знаю, как правильно реализовать multiprocessing.queue, вы не можете действительно создать экземпляр объекта для каждого процесса, поскольку они будут отдельными очередями, как вы убедитесь, что все процессы относятся к общей очереди (или в данном случае очереди)
Ответы:
Это простой пример того, как читатель и писатель разделяют одну очередь ... Писатель отправляет читателю связку целых чисел; когда у модуля записи заканчиваются числа, он отправляет «DONE», что дает читателю понять, что нужно выйти из цикла чтения.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
источник
в "
from queue import Queue
" модуль не вызываетсяqueue
, вместо негоmultiprocessing
следует использовать. Следовательно, он должен выглядеть так: "from multiprocessing import Queue
"источник
multiprocessing.Queue
правильно. НормальныйQueue.Queue
используется для потоков Python . Когда вы пытаетесь использоватьQueue.Queue
с многопроцессорностью, копии объекта Queue будут созданы в каждом дочернем процессе, и дочерние процессы никогда не будут обновлены. По сути,Queue.Queue
работает с использованием глобального общего объекта иmultiprocessing.Queue
работает с использованием IPC. См .: stackoverflow.com/questions/925100/…Вот мертвое использование простой из
multiprocessing.Queue
иmultiprocessing.Process
что позволяет абонентам отправить «событие» плюс аргументы в отдельный процесс , который отправляет событие к методу «do_» на процесс. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Применение:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
То
send
происходит в родительском процессе, тоdo_*
происходит в дочернем процессе.Я не учел обработку исключений, которая, очевидно, прерывала бы цикл выполнения и выходила из дочернего процесса. Вы также можете настроить его, переопределив
run
блокировку управления или что-то еще.Это действительно полезно только в ситуациях, когда у вас есть единственный рабочий процесс, но я думаю, что это уместный ответ на этот вопрос, чтобы продемонстрировать общий сценарий с немного большей объектной ориентацией.
источник
Я просмотрел несколько ответов о переполнении стека и в Интернете, пытаясь настроить способ выполнения многопроцессорной обработки с использованием очередей для передачи больших фреймов данных pandas. Мне казалось, что каждый ответ повторял одни и те же решения без учета множества крайних случаев, которые обязательно встретятся при настройке подобных вычислений. Проблема в том, что одновременно задействовано много вещей. Количество задач, количество рабочих, продолжительность каждой задачи и возможные исключения во время выполнения задачи. Все это затрудняет синхронизацию, и в большинстве ответов не рассматривается, как это сделать. Итак, это мое мнение после нескольких часов возни, надеюсь, оно будет достаточно общим, чтобы большинство людей сочло его полезным.
Некоторые мысли перед примерами кодирования. Поскольку
queue.Empty
илиqueue.qsize()
любой другой подобный метод ненадежен для управления потоком, любой подобный кодwhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
подделка. Это убьет воркера, даже если через миллисекунды в очереди появится другая задача. Рабочий не восстановится, и через некоторое время ВСЕ рабочие исчезнут, так как они случайно обнаружат, что очередь на мгновение пуста. Конечным результатом будет то, что основная функция многопроцессорности (с функцией join () для процессов) вернется без выполнения всех задач. Ницца. Удачи в отладке, если у вас есть тысячи задач, а некоторые из них отсутствуют.
Другая проблема - это использование контрольных значений. Многие люди предлагали добавить в очередь значение дозорного, чтобы отметить конец очереди. Но кому именно помечать? Если есть N рабочих, предполагая, что N - это количество доступных ядер, то единственное контрольное значение будет указывать только на конец очереди для одного рабочего. Все остальные рабочие будут сидеть и ждать, пока не останется работы. Типичные примеры, которые я видел:
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Один рабочий получит значение дозорного, а остальные будут ждать бесконечно. Ни в одном сообщении, которое я встретил, не упоминалось, что вам нужно отправить значение дозорного в очередь, ПО МЕНЬШЕ, столько раз, сколько у вас есть воркеров, чтобы ВСЕ они его получили.
Другая проблема - обработка исключений во время выполнения задачи. Опять же, их нужно ловить и управлять ими. Более того, если у вас есть
completed_tasks
очередь, вы должны независимо детерминированным образом подсчитать, сколько элементов находится в очереди, прежде чем вы решите, что задание выполнено. Опять же, полагаться на размеры очереди обречено на неудачу и возвращать неожиданные результаты.В приведенном ниже примере
par_proc()
функция получит список задач, включая функции, с которыми эти задачи должны выполняться вместе с любыми именованными аргументами и значениями.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
И вот тест для запуска приведенного выше кода против
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
плюс еще один за некоторыми исключениями
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Надеюсь, что это поможет.
источник
Мы реализовали две его версии: одну - простой многопоточный пул, который может выполнять многие типы вызываемых объектов, что значительно упрощает нашу жизнь, а во второй версии - процессы , которые менее гибки с точки зрения вызываемых объектов и требуют дополнительного вызова dill.
Если для параметра frozen_pool установлено значение true, выполнение будет приостановлено до тех пор, пока в любом из классов не будет вызвана finish_pool_queue.
Версия резьбы:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Версия процесса:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Звоните с помощью:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
или
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
источник
Просто сделал простой и общий пример для демонстрации передачи сообщения в очереди между двумя автономными программами. Он не отвечает напрямую на вопрос OP, но должен быть достаточно ясным, указывая на концепцию.
Сервер:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Клиент:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Применение
Сервер:
N
- целое число, указывающее, сколько серверов следует создать. Скопируйте один из<server-address-N>
выходных данных сервера и сделайте его первым аргументом каждогоmultiprocessing-queue-manager-client.py
.Клиент:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Результат
Сервер:
Client 1: <item> from <server-address-1>
Суть: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : Создал пакет здесь .
Сервер:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Клиент:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
источник